Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Implement logsink for the system event logs. #46143

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,8 @@ func (s *Server) Start(ctx context.Context) error {

// Start garbage collecting system events.
s.startSystemLogsGC(ctx)
// Start system.eventlog sink to local logs
s.startEventlogSink(ctx)

// Serve UI assets.
//
Expand Down
194 changes: 194 additions & 0 deletions pkg/server/server_systemlog_logsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

var eventlogSinkEnabled = settings.RegisterBoolSetting(
"server.eventlogsink.enabled",
`set to true to enable system.eventlog entries to be emitted to the node log`,
false,
)

var eventlogSinkPeriod = settings.RegisterDurationSetting(
"server.eventlogsink.period",
`how frequently should the eventlog table be polled`,
10*time.Second,
)

var eventlogSinkMaxEntries = settings.RegisterIntSetting(
"server.eventlogsink.max_entries",
`max number of eventlog entries to emit; set to 0 to emit all`,
100,
)

var eventlogSincIncludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.include_events",
`extract events matching this regex`,
".*",
)

var eventlogSincExcludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.exclude_events",
`exclude events matching this regex`,
"comment|sequence",
)

func getEventlogSinkPeriod(sv *settings.Values) time.Duration {
if !eventlogSinkEnabled.Get(sv) {
// If log sink is not enabled, recheck after a minute.
return time.Minute
}

p := eventlogSinkPeriod.Get(sv)
if p < time.Second {
return time.Second
}
return p
}

// eventLogSink is just a wrapper around log methods to facilitate testing.
type eventLogSink interface {
ReportEvent(ctx context.Context, ts time.Time, event string, reporter string, info string)
Errorf(ctx context.Context, format string, args ...interface{})
}

type defaultLogSink struct{}

var _ eventLogSink = &defaultLogSink{}

func (d defaultLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
log.Infof(ctx, "system.eventlog:n=%s:%s:%s %s", reporter, event, ts, info)
}

func (d defaultLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
log.Errorf(ctx, format, args...)
}

// sinkEventLog emits system.eventlog entries into the node log file.
// TODO(yevgeniy): Replace implementation with rangefeed based one, once rangefeeds
// support listening to system tables.
func sinkEventlog(
ctx context.Context,
sink eventLogSink,
db *kv.DB,
ex *sql.InternalExecutor,
sv *settings.Values,
logTimestamp time.Time,
) (time.Time, error) {
maxEntries := eventlogSinkMaxEntries.Get(sv)
var limit interface{}
if maxEntries > 0 {
limit = maxEntries + 1
}
includeEvents := eventlogSincIncludeEvents.Get(sv)
excludeEvents := eventlogSincExcludeEvents.Get(sv)

const (
tsCol = iota
eventTypeCol
reportingIDCol
infoCol
)

const getEventsQuery = `
SELECT timestamp, "eventType", "reportingID", info
FROM [
SELECT * FROM system.eventlog
WHERE
timestamp > $1 AND
regexp_extract("eventType", $2) IS NOT NULL AND -- include events
regexp_extract("eventType", $3) IS NULL -- exclude events
ORDER BY timestamp DESC
LIMIT $4
] ORDER BY timestamp`

maxTs := logTimestamp
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
rows, err := ex.QueryEx(
ctx,
"system.eventlog-sink",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
getEventsQuery,
logTimestamp,
includeEvents,
excludeEvents,
limit,
)

if err != nil {
return err
}

if maxEntries > 0 && int64(len(rows)) > maxEntries {
rows = rows[1:] // Drop oldest event
log.Infof(ctx,
"system.eventlog likely contains more than %d events emitted since %s; "+
"consider increasing server.eventlogsink.max_entries setting, "+
"or lowering the server.eventlogsink.period",
maxEntries, logTimestamp,
)
}

for _, row := range rows {
ts, ok := row[tsCol].(*tree.DTimestamp)
if !ok {
return errors.Errorf("timestamp is of unknown type %T", rows[0])
}

maxTs = ts.Time
sink.ReportEvent(
ctx, maxTs, row[eventTypeCol].String(), row[reportingIDCol].String(), row[infoCol].String())
}
return nil
})

return maxTs, err
}

func (s *Server) startEventlogSink(ctx context.Context) {
logTimestamp := time.Now()

s.stopper.RunWorker(ctx, func(ctx context.Context) {
sv := &s.ClusterSettings().SV
for {
select {
case <-s.stopper.ShouldStop():
return
case <-time.After(getEventlogSinkPeriod(sv)):
if eventlogSinkEnabled.Get(sv) {
newTs, err := sinkEventlog(ctx, defaultLogSink{}, s.db, s.internalExecutor, sv, logTimestamp)
if err == nil {
logTimestamp = newTs
} else {
log.Errorf(ctx, "failed to sink system.eventlog records: %v", err)
}
}
}
}
})
}
133 changes: 133 additions & 0 deletions pkg/server/server_systemlog_logsink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type testLogSink struct {
server *TestServer
db *sql.DB
events []string
haveErr bool
}

var _ eventLogSink = &testLogSink{}

func (t *testLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
t.events = append(t.events, event)
}

func (t *testLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
t.haveErr = true
}

func (t *testLogSink) doSink(ts time.Time) (time.Time, error) {
t.events = nil
t.haveErr = false
return sinkEventlog(context.Background(),
t, t.server.db, t.server.internalExecutor, &t.server.ClusterSettings().SV, ts)
}

func addEvents(t *testing.T, sqlDB *sqlutils.SQLRunner, startTs time.Time, num int) time.Time {
for i := 0; i < num; i++ {
startTs = startTs.Add(time.Second)
sqlDB.Exec(t,
`INSERT INTO system.eventlog (
"timestamp", "eventType", "targetID", "reportingID"
) VALUES ($1, $2, $3, $4)`,
startTs, fmt.Sprintf("test_event-%d", i), i, i,
)
}
return startTs
}

func TestSystemEventLogSink(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
updater := st.MakeUpdater()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: st,
})
sqlDB := sqlutils.MakeSQLRunner(db)
ts := s.(*TestServer)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sink := &testLogSink{server: ts, db: db}

// Write some events to the event table.
startTs := timeutil.Now()
expectedEndTs := addEvents(t, sqlDB, startTs, 10)

// Scan all entries.
require.NoError(t, updater.Set("server.eventlogsink.max_entries", "-1", "i"))

// Since 'set cluster setting' also logs to the events table,
// arrange only for 'test_event' events to be picked up.
require.NoError(t,
updater.Set("server.eventlogsink.include_events", "test_event", "s"))

// Verify we have 10 events.
endTs, err := sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.Equal(t, 10, len(sink.events))
require.False(t, sink.haveErr)

// Limit the number of events.
require.NoError(t, updater.Set("server.eventlogsink.max_entries", "3", "i"))

// Verify we get 3 latest events and ignore 7 older ones.
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.False(t, sink.haveErr)
require.Equal(t, []string{"'test_event-7'", "'test_event-8'", "'test_event-9'"}, sink.events)

// Exclude test_event 7 & 9
require.NoError(t,
updater.Set("server.eventlogsink.exclude_events", "test_event-(7|9)", "s"))
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, []string{"'test_event-5'", "'test_event-6'", "'test_event-8'"}, sink.events)
require.Equal(t, startTs.Add(9*time.Second), endTs)
require.False(t, sink.haveErr)

// Query events starting from the previous endTs -- nothing should be returned.
oldEndTs := endTs
endTs, err = sink.doSink(oldEndTs)
require.NoError(t, err)
require.EqualValues(t, oldEndTs, endTs)
require.Nil(t, sink.events)

// Add a new event and make sure it is retrieved.
expectedEndTs = addEvents(t, sqlDB, endTs, 1)
endTs, err = sink.doSink(endTs)
require.NoError(t, err)
require.EqualValues(t, []string{"'test_event-0'"}, sink.events)
}