Skip to content

Commit

Permalink
server: Implement logsink for the system event logs.
Browse files Browse the repository at this point in the history
Fixes cockroachdb#45643

Cockroach server logs important system events into the eventlog table.
These events are exposed on the web UI.  However, the operators often
want to see those global events while tailing a log file on a single
node.

Implement a mechanism for the server running on each node
to emit those system events into server log file.

If the system log scanning is enabled (via server.eventlogsink.enabled setting),
then each node scans the system log table periodically,
every server.eventlogsink.period period;

For example, below is a single system event emitted to the regular log file.:
  I200323 .... [n1] system.eventlog:n=1:'set_cluster_setting':2020-03-23 19:24:29.948279
    +0000 UTC '{"SettingName":"server.eventlogsink.max_entries","Value":"101","User":"root"}'

There is no guaranteed that all events from system log will be eimitted.
In particular, upon node restart, we only emit events that were generated
from that point on.  Also, if for whatever reason,we start emitting
too many system log messages, then only up to the
server.eventlogsink.max_entries (default 100) recent events will be emitted.
If we think we have "dropped" some events due to confuration settings,
we will indicate so in the log.

The administrators may choose to restrict the set of events emitted
by changing server.eventlogsink.include_events and/or
server.eventlogsink.exclude_events settings.  These settings specify
regular expressions to include or exclude events with matching event
types.

Release notes (feature): Log system wide events into cockroach.log
file on every node.

This feature allows the administrator logged in into one of the
nodes to monitor that nodes log file and see important "system" events,
such as table/index creationg, schema change jobs, etc.

To use this feature, the server.eventlogsink.enabled setting needs
to be set to true.
  • Loading branch information
Yevgeniy Miretskiy committed Apr 1, 2020
1 parent 34bb7b1 commit eae879d
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,8 @@ func (s *Server) Start(ctx context.Context) error {

// Start garbage collecting system events.
s.startSystemLogsGC(ctx)
// Start system.eventlog sink to local logs
s.startEventlogSink(ctx)

// Serve UI assets.
//
Expand Down
194 changes: 194 additions & 0 deletions pkg/server/server_systemlog_logsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

var eventlogSinkEnabled = settings.RegisterBoolSetting(
"server.eventlogsink.enabled",
`set to true to enable system.eventlog entries to be emitted to the node log`,
false,
)

var eventlogSinkPeriod = settings.RegisterDurationSetting(
"server.eventlogsink.period",
`how frequently should the eventlog table be polled`,
10*time.Second,
)

var eventlogSinkMaxEntries = settings.RegisterIntSetting(
"server.eventlogsink.max_entries",
`max number of eventlog entries to emit; set to 0 to emit all`,
100,
)

var eventlogSincIncludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.include_events",
`extract events matching this regex`,
".*",
)

var eventlogSincExcludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.exclude_events",
`exclude events matching this regex`,
"comment|sequence",
)

func getEventlogSinkPeriod(sv *settings.Values) time.Duration {
if !eventlogSinkEnabled.Get(sv) {
// If log sink is not enabled, recheck after a minute.
return time.Minute
}

p := eventlogSinkPeriod.Get(sv)
if p < time.Second {
return time.Second
}
return p
}

// eventLogSink is just a wrapper around log methods to facilitate testing.
type eventLogSink interface {
ReportEvent(ctx context.Context, ts time.Time, event string, reporter string, info string)
Errorf(ctx context.Context, format string, args ...interface{})
}

type defaultLogSink struct{}

var _ eventLogSink = &defaultLogSink{}

func (d defaultLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
log.Infof(ctx, "system.eventlog:n=%s:%s:%s %s", reporter, event, ts, info)
}

func (d defaultLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
log.Errorf(ctx, format, args...)
}

// sinkEventLog emits system.eventlog entries into the node log file.
// TODO(yevgeniy): Replace implementation with rangefeed based one, once rangefeeds
// support listening to system tables.
func sinkEventlog(
ctx context.Context,
sink eventLogSink,
db *kv.DB,
ex *sql.InternalExecutor,
sv *settings.Values,
logTimestamp time.Time,
) (time.Time, error) {
maxEntries := eventlogSinkMaxEntries.Get(sv)
var limit interface{}
if maxEntries > 0 {
limit = maxEntries + 1
}
includeEvents := eventlogSincIncludeEvents.Get(sv)
excludeEvents := eventlogSincExcludeEvents.Get(sv)

const (
tsCol = iota
eventTypeCol
reportingIDCol
infoCol
)

const getEventsQuery = `
SELECT timestamp, "eventType", "reportingID", info
FROM [
SELECT * FROM system.eventlog
WHERE
timestamp > $1 AND
regexp_extract("eventType", $2) IS NOT NULL AND -- include events
regexp_extract("eventType", $3) IS NULL -- exclude events
ORDER BY timestamp DESC
LIMIT $4
] ORDER BY timestamp`

maxTs := logTimestamp
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
rows, err := ex.QueryEx(
ctx,
"system.eventlog-sink",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
getEventsQuery,
logTimestamp,
includeEvents,
excludeEvents,
limit,
)

if err != nil {
return err
}

if maxEntries > 0 && int64(len(rows)) > maxEntries {
rows = rows[1:] // Drop oldest event
log.Infof(ctx,
"system.eventlog likely contains more than %d events emitted since %s; "+
"consider increasing server.eventlogsink.max_entries setting, "+
"or lowering the server.eventlogsink.period",
maxEntries, logTimestamp,
)
}

for _, row := range rows {
ts, ok := row[tsCol].(*tree.DTimestamp)
if !ok {
return errors.Errorf("timestamp is of unknown type %T", rows[0])
}

maxTs = ts.Time
sink.ReportEvent(
ctx, maxTs, row[eventTypeCol].String(), row[reportingIDCol].String(), row[infoCol].String())
}
return nil
})

return maxTs, err
}

func (s *Server) startEventlogSink(ctx context.Context) {
logTimestamp := time.Now()

s.stopper.RunWorker(ctx, func(ctx context.Context) {
sv := &s.ClusterSettings().SV
for {
select {
case <-s.stopper.ShouldStop():
return
case <-time.After(getEventlogSinkPeriod(sv)):
if eventlogSinkEnabled.Get(sv) {
newTs, err := sinkEventlog(ctx, defaultLogSink{}, s.db, s.internalExecutor, sv, logTimestamp)
if err == nil {
logTimestamp = newTs
} else {
log.Errorf(ctx, "failed to sink system.eventlog records: %v", err)
}
}
}
}
})
}
133 changes: 133 additions & 0 deletions pkg/server/server_systemlog_logsink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type testLogSink struct {
server *TestServer
db *sql.DB
events []string
haveErr bool
}

var _ eventLogSink = &testLogSink{}

func (t *testLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
t.events = append(t.events, event)
}

func (t *testLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
t.haveErr = true
}

func (t *testLogSink) doSink(ts time.Time) (time.Time, error) {
t.events = nil
t.haveErr = false
return sinkEventlog(context.Background(),
t, t.server.db, t.server.internalExecutor, &t.server.ClusterSettings().SV, ts)
}

func addEvents(t *testing.T, sqlDB *sqlutils.SQLRunner, startTs time.Time, num int) time.Time {
for i := 0; i < num; i++ {
startTs = startTs.Add(time.Second)
sqlDB.Exec(t,
`INSERT INTO system.eventlog (
"timestamp", "eventType", "targetID", "reportingID"
) VALUES ($1, $2, $3, $4)`,
startTs, fmt.Sprintf("test_event-%d", i), i, i,
)
}
return startTs
}

func TestSystemEventLogSink(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
updater := st.MakeUpdater()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: st,
})
sqlDB := sqlutils.MakeSQLRunner(db)
ts := s.(*TestServer)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sink := &testLogSink{server: ts, db: db}

// Write some events to the event table.
startTs := timeutil.Now()
expectedEndTs := addEvents(t, sqlDB, startTs, 10)

// Scan all entries.
require.NoError(t, updater.Set("server.eventlogsink.max_entries", "-1", "i"))

// Since 'set cluster setting' also logs to the events table,
// arrange only for 'test_event' events to be picked up.
require.NoError(t,
updater.Set("server.eventlogsink.include_events", "test_event", "s"))

// Verify we have 10 events.
endTs, err := sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.Equal(t, 10, len(sink.events))
require.False(t, sink.haveErr)

// Limit the number of events.
require.NoError(t, updater.Set("server.eventlogsink.max_entries", "3", "i"))

// Verify we get 3 latest events and ignore 7 older ones.
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.False(t, sink.haveErr)
require.Equal(t, []string{"'test_event-7'", "'test_event-8'", "'test_event-9'"}, sink.events)

// Exclude test_event 7 & 9
require.NoError(t,
updater.Set("server.eventlogsink.exclude_events", "test_event-(7|9)", "s"))
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, []string{"'test_event-5'", "'test_event-6'", "'test_event-8'"}, sink.events)
require.Equal(t, startTs.Add(9*time.Second), endTs)
require.False(t, sink.haveErr)

// Query events starting from the previous endTs -- nothing should be returned.
oldEndTs := endTs
endTs, err = sink.doSink(oldEndTs)
require.NoError(t, err)
require.EqualValues(t, oldEndTs, endTs)
require.Nil(t, sink.events)

// Add a new event and make sure it is retrieved.
expectedEndTs = addEvents(t, sqlDB, endTs, 1)
endTs, err = sink.doSink(endTs)
require.NoError(t, err)
require.EqualValues(t, []string{"'test_event-0'"}, sink.events)
}

0 comments on commit eae879d

Please sign in to comment.