From a1281552e4ecb5650d5d324c0a7eaac9387070e8 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 15:38:27 +0200 Subject: [PATCH 1/8] sql: simplify the internal event logging APIs Prior to this patch, there was a little mess in `sql/event_log.go` that had been introduced when "optimizing" GRANT/REVOKE to only use one write batch for all the events logged: - the API interface for the functions in event_log.go were complex to use, requiring callers to provide descriptor IDs and structured events as separate slices; - the optimizing logic was not properly applied to the other case where multiple events are emitted: SQL audit logging in `exec_log.go`. This patch cleans this up by using the same struct `eventLogEntry` as argument to the various APIs. Release note: None --- pkg/sql/create_stats.go | 9 ++-- pkg/sql/event_log.go | 97 ++++++++++++++++++++++------------------- pkg/sql/exec_log.go | 30 +++++++------ pkg/sql/grant_revoke.go | 35 ++++++--------- 4 files changed, 88 insertions(+), 83 deletions(-) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 00b030728aaa..50b28a9ac5d2 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -576,16 +576,17 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er // See: https://github.com/cockroachdb/cockroach/issues/57739 return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return logEventInternalForSQLStatements(ctx, evalCtx.ExecCfg, txn, - descpb.IDs{details.Table.ID}, evalCtx.SessionData.User(), evalCtx.SessionData.ApplicationName, details.Statement, "CREATE STATISTICS", nil, /* no placeholders known at this point */ true, /* writeToEventLog */ - &eventpb.CreateStatistics{ - TableName: details.FQTableName, - }, + eventLogEntry{ + targetID: int32(details.Table.ID), + event: &eventpb.CreateStatistics{ + TableName: details.FQTableName, + }}, ) }) } diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 74e60b7a9e21..6d4be2eaf848 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -30,28 +30,39 @@ import ( "github.com/cockroachdb/errors" ) +// eventLogEntry represents a SQL-level event to be sent to logging +// outputs(s). +type eventLogEntry struct { + // targetID is the main object affected by this event. + // For DDL statements, this is typically the ID of + // the affected descriptor. + targetID int32 + + // event is the main event payload. + event eventpb.EventPayload +} + // logEvent emits a cluster event in the context of a regular SQL // statement. func (p *planner) logEvent( - ctx context.Context, descID descpb.ID, events eventpb.EventPayload, + ctx context.Context, descID descpb.ID, event eventpb.EventPayload, ) error { - return p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, true /* writeToEventLog */, events) + return p.logEventsWithSystemEventLogOption(ctx, true, /* writeToEventLog */ + eventLogEntry{targetID: int32(descID), event: event}) } -// batchLogEvents is like logEvent, except it takes in slice of events -// to batch write. -func (p *planner) batchLogEvents( - ctx context.Context, descIDs descpb.IDs, events ...eventpb.EventPayload, -) error { - return p.logEventsWithSystemEventLogOption(ctx, descIDs, true /* writeToEventLog */, events...) +// logEvents is like logEvent, except that it can write multiple +// events simultaneously. This is advantageous for SQL statements +// that produce multiple events, e.g. GRANT, as they will +// processed using only one write batch (and thus lower latency). +func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error { + return p.logEventsWithSystemEventLogOption(ctx, true /* writeToEventLog */, entries...) } -func (p *planner) logEventOnlyExternally( - ctx context.Context, descID descpb.ID, event eventpb.EventPayload, -) { +func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { // The API contract for logEventWithSystemEventLogOption() is that it returns // no error when system.eventlog is not written to. - _ = p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, false /* writeToEventLog */, event) + _ = p.logEventsWithSystemEventLogOption(ctx, false /* writeToEventLog */, entries...) } // logEventsWithSystemEventLogOption is like logEvent() but it gives @@ -61,14 +72,14 @@ func (p *planner) logEventOnlyExternally( // If writeToEventLog is false, this function guarantees that it // returns no error. func (p *planner) logEventsWithSystemEventLogOption( - ctx context.Context, descIDs descpb.IDs, writeToEventLog bool, events ...eventpb.EventPayload, + ctx context.Context, writeToEventLog bool, entries ...eventLogEntry, ) error { user := p.User() stmt := tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations) stmtTag := p.stmt.AST.StatementTag() pl := p.extendedEvalCtx.EvalContext.Placeholders.Values appName := p.SessionData().ApplicationName - return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, descIDs, user, appName, stmt, stmtTag, pl, writeToEventLog, events...) + return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, user, appName, stmt, stmtTag, pl, writeToEventLog, entries...) } // logEventInternalForSchemaChange emits a cluster event in the @@ -116,19 +127,18 @@ func logEventInternalForSQLStatements( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, - descIDs descpb.IDs, user security.SQLUsername, appName string, stmt string, stmtTag string, placeholders tree.QueryArguments, writeToEventLog bool, - events ...eventpb.EventPayload, + entries ...eventLogEntry, ) error { // Inject the common fields into the payload provided by the caller. - for i := range events { + for i := range entries { if err := injectCommonFields( - txn, descIDs[i], user, appName, stmt, stmtTag, placeholders, events[i], + txn, entries[i].targetID, user, appName, stmt, stmtTag, placeholders, entries[i].event, ); err != nil { return err } @@ -136,21 +146,20 @@ func logEventInternalForSQLStatements( // Delegate the storing of the event to the regular event logic. if !writeToEventLog || !eventLogEnabled.Get(&execCfg.InternalExecutor.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, events, !writeToEventLog) + return skipWritePath(ctx, txn, entries, !writeToEventLog) } return batchInsertEventRecords(ctx, execCfg.InternalExecutor, txn, - descIDs, int32(execCfg.NodeID.SQLInstanceID()), - events..., + entries..., ) } // injectCommonFields injects the common fields into the event payload provided by the caller. func injectCommonFields( txn *kv.Txn, - descID descpb.ID, + descID int32, user security.SQLUsername, appName string, stmt string, @@ -247,14 +256,15 @@ func InsertEventRecord( onlyLog bool, ) error { if onlyLog || !eventLogEnabled.Get(&ex.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, []eventpb.EventPayload{info}, onlyLog) + return skipWritePath(ctx, txn, []eventLogEntry{{event: info}}, onlyLog) } return batchInsertEventRecords( ctx, ex, txn, - descpb.IDs{descpb.ID(targetID)}, reportingID, - info, - ) + eventLogEntry{ + targetID: targetID, + event: info, + }) } // batchInsertEventRecords is like InsertEventRecord except it takes @@ -265,9 +275,8 @@ func batchInsertEventRecords( ctx context.Context, ex *InternalExecutor, txn *kv.Txn, - descIDs descpb.IDs, reportingID int32, - events ...eventpb.EventPayload, + entries ...eventLogEntry, ) error { const colsPerEvent = 5 const baseQuery = ` @@ -275,22 +284,22 @@ INSERT INTO system.eventlog ( timestamp, "eventType", "targetID", "reportingID", info ) VALUES($1, $2, $3, $4, $5)` - args := make([]interface{}, 0, len(events)*colsPerEvent) + args := make([]interface{}, 0, len(entries)*colsPerEvent) // Prepare first row so we can take the fast path if we're only inserting one event log. if err := prepareRow( - ctx, txn, &args, events[0], descIDs[0], reportingID, + ctx, txn, &args, entries[0].event, entries[0].targetID, reportingID, ); err != nil { return err } - if len(events) == 1 { - return execEventLogInsert(ctx, ex, txn, baseQuery, args, len(events)) + if len(entries) == 1 { + return execEventLogInsert(ctx, ex, txn, baseQuery, args, len(entries)) } var additionalRows strings.Builder - for i := 1; i < len(events); i++ { + for i := 1; i < len(entries); i++ { var placeholderNum = 1 + (i * colsPerEvent) - if err := prepareRow(ctx, txn, &args, events[i], descIDs[i], reportingID); err != nil { + if err := prepareRow(ctx, txn, &args, entries[i].event, entries[i].targetID, reportingID); err != nil { return err } additionalRows.WriteString(fmt.Sprintf(", ($%d, $%d, $%d, $%d, $%d)", @@ -301,20 +310,18 @@ VALUES($1, $2, $3, $4, $5)` if err != nil { return err } - if rows != len(events) { - return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(events)) + if rows != len(entries) { + return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(entries)) } return nil } // skipWritePath is used when either onlyLog is true, or writes to the event log // table are disabled. In these cases, we do not write to the event log table. -func skipWritePath( - ctx context.Context, txn *kv.Txn, events []eventpb.EventPayload, onlyLog bool, -) error { - for i := range events { +func skipWritePath(ctx context.Context, txn *kv.Txn, entries []eventLogEntry, onlyLog bool) error { + for i := range entries { if err := setupEventAndMaybeLog( - ctx, txn, events[i], onlyLog, + ctx, txn, entries[i].event, onlyLog, ); err != nil { return err } @@ -357,13 +364,13 @@ func constructArgs( args *[]interface{}, event eventpb.EventPayload, eventType string, - descID descpb.ID, + targetID int32, reportingID int32, ) error { *args = append( *args, timeutil.Unix(0, event.CommonDetails().Timestamp), - eventType, int32(descID), + eventType, targetID, reportingID, ) var info interface{} @@ -405,7 +412,7 @@ func prepareRow( txn *kv.Txn, args *[]interface{}, event eventpb.EventPayload, - descID descpb.ID, + targetID int32, reportingID int32, ) error { // Setup event log. @@ -417,7 +424,7 @@ func prepareRow( } // Construct the args for this row. - if err := constructArgs(args, event, eventType, descID, reportingID); err != nil { + if err := constructArgs(args, event, eventType, targetID, reportingID); err != nil { return err } return nil diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 75a3a3cf62f3..5c7474beaf18 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -283,7 +283,8 @@ func (p *planner) maybeLogStatementInternal( if auditEventsDetected { // TODO(knz): re-add the placeholders and age into the logging event. - for _, ev := range p.curPlan.auditEvents { + entries := make([]eventLogEntry, len(p.curPlan.auditEvents)) + for i, ev := range p.curPlan.auditEvents { mode := "r" if ev.writing { mode = "rw" @@ -303,15 +304,18 @@ func (p *planner) maybeLogStatementInternal( tableName = tn.FQString() } } - - p.logEventOnlyExternally(ctx, ev.desc.GetID(), - &eventpb.SensitiveTableAccess{ + entries[i] = eventLogEntry{ + targetID: int32(ev.desc.GetID()), + event: &eventpb.SensitiveTableAccess{ CommonSQLExecDetails: execDetails, TableName: tableName, AccessMode: mode, - }) + }, + } } + p.logEventsOnlyExternally(ctx, entries...) } + if slowQueryLogEnabled && ( // Did the user request pumping queries into the slow query log when // the logical plan has full scans? @@ -321,25 +325,25 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, + eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, + eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) } } if logExecuteEnabled { - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.QueryExecute{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, + eventLogEntry{event: &eventpb.QueryExecute{CommonSQLExecDetails: execDetails}}) } if shouldLogToAdminAuditLog { - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, + eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) } } diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 1d9a7b1ec5d0..ad5ebfe90a15 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -177,12 +177,7 @@ func (n *changePrivilegesNode) startExec(params runParams) error { return err } - // The events to log at the end. - type eventEntry struct { - descID descpb.ID - event eventpb.EventPayload - } - var events []eventEntry + var events []eventLogEntry // First, update the descriptors. We want to catch all errors before // we update them in KV below. @@ -239,8 +234,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeDatabasePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeDatabasePrivilege{ CommonSQLPrivilegeEventDetails: privs, DatabaseName: (*tree.Name)(&d.Name).String(), }}) @@ -264,8 +260,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeTablePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeTablePrivilege{ CommonSQLPrivilegeEventDetails: privs, TableName: d.Name, // FIXME }}) @@ -278,8 +275,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeTypePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeTypePrivilege{ CommonSQLPrivilegeEventDetails: privs, TypeName: d.Name, // FIXME }}) @@ -295,8 +293,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeSchemaPrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeSchemaPrivilege{ CommonSQLPrivilegeEventDetails: privs, SchemaName: d.Name, // FIXME }}) @@ -312,13 +311,7 @@ func (n *changePrivilegesNode) startExec(params runParams) error { // Record the privilege changes in the event log. This is an // auditable log event and is recorded in the same transaction as // the table descriptor update. - descIDs := make(descpb.IDs, 0, len(events)) - eventPayloads := make([]eventpb.EventPayload, 0, len(events)) - for _, ev := range events { - descIDs = append(descIDs, ev.descID) - eventPayloads = append(eventPayloads, ev.event) - } - if err := params.p.batchLogEvents(params.ctx, descIDs, eventPayloads...); err != nil { + if err := params.p.logEvents(params.ctx, events...); err != nil { return err } return nil From 9318af940f86079628584ef146bb8e0bd71d505f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 15:53:55 +0200 Subject: [PATCH 2/8] sql: use a single code path for both single-event and multi-event logging The previous patch to batch event writes for GRANT/REVOKE had duplicated code. This was not necessary. This patch fixes this by using the same code for both cases. Release note: None --- pkg/sql/event_log.go | 278 +++++++++++++++++-------------------------- 1 file changed, 106 insertions(+), 172 deletions(-) diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 6d4be2eaf848..4b236129c596 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -104,14 +104,16 @@ func logEventInternalForSchemaChanges( m.MutationID = uint32(mutationID) // Delegate the storing of the event to the regular event logic. - return InsertEventRecord( + return insertEventRecords( ctx, execCfg.InternalExecutor, txn, - int32(descID), - int32(execCfg.NodeID.SQLInstanceID()), - false, /* skipExternalLog */ - event, - false, /* onlyLog */ + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + false, /* skipExternalLog */ + false, /* onlyLog */ + eventLogEntry{ + targetID: int32(descID), + event: event, + }, ) } @@ -144,14 +146,11 @@ func logEventInternalForSQLStatements( } } - // Delegate the storing of the event to the regular event logic. - if !writeToEventLog || !eventLogEnabled.Get(&execCfg.InternalExecutor.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, entries, !writeToEventLog) - } - - return batchInsertEventRecords(ctx, execCfg.InternalExecutor, + return insertEventRecords(ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), + false, /* skipExternalLog */ + !writeToEventLog, /* onlyLog */ entries..., ) } @@ -214,14 +213,13 @@ func LogEventForJobs( m.Description = payload.Description // Delegate the storing of the event to the regular event logic. - return InsertEventRecord( + return insertEventRecords( ctx, execCfg.InternalExecutor, txn, - 0, /* targetID */ - int32(execCfg.NodeID.SQLInstanceID()), - false, /* skipExternalLog */ - event, - false, /* onlyLog */ + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + false, /* skipExternalLog */ + false, /* onlyLog */ + eventLogEntry{event: event}, ) } @@ -234,18 +232,7 @@ var eventLogEnabled = settings.RegisterBoolSetting( // InsertEventRecord inserts a single event into the event log as part // of the provided transaction, using the provided internal executor. // -// The caller is responsible for populating the timestamp field -// in the event payload. -// -// If the skipExternalLog bool is set, this function does not call -// log.StructuredEvent(). In that case, the caller is responsible for -// calling log.StructuredEvent() directly. -// -// Note: the targetID and reportingID columns are deprecated and -// should be removed after v21.1 is released. -// -// If onlyLog is set, this function guarantees that it returns no -// error. +// This converts to a call to insertEventRecords() with just 1 entry. func InsertEventRecord( ctx context.Context, ex *InternalExecutor, @@ -255,177 +242,124 @@ func InsertEventRecord( info eventpb.EventPayload, onlyLog bool, ) error { - if onlyLog || !eventLogEnabled.Get(&ex.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, []eventLogEntry{{event: info}}, onlyLog) - } - return batchInsertEventRecords( - ctx, ex, txn, - reportingID, - eventLogEntry{ - targetID: targetID, - event: info, - }) + return insertEventRecords(ctx, ex, txn, reportingID, + skipExternalLog, onlyLog, + eventLogEntry{targetID: targetID, event: info}) } -// batchInsertEventRecords is like InsertEventRecord except it takes -// a slice of events to batch write. Any insert that calls this function -// will always write to the event table (i.e. it won't only log them, and writing -// to the event table will not be disabled). -func batchInsertEventRecords( +// insertEventRecords inserts one or more event into the event log as +// part of the provided txn, using the provided internal executor. +// +// The caller is responsible for populating the timestamp field in the +// event payload and all the other per-payload specific fields. This +// function only takes care of populating the EventType field based on +// the run-time type of the event payload. +// +// Note: the targetID and reportingID columns are deprecated and +// should be removed after v21.1 is released. +func insertEventRecords( ctx context.Context, ex *InternalExecutor, txn *kv.Txn, reportingID int32, + skipExternalLog bool, + onlyLog bool, entries ...eventLogEntry, ) error { - const colsPerEvent = 5 - const baseQuery = ` -INSERT INTO system.eventlog ( - timestamp, "eventType", "targetID", "reportingID", info -) -VALUES($1, $2, $3, $4, $5)` - args := make([]interface{}, 0, len(entries)*colsPerEvent) - - // Prepare first row so we can take the fast path if we're only inserting one event log. - if err := prepareRow( - ctx, txn, &args, entries[0].event, entries[0].targetID, reportingID, - ); err != nil { - return err - } - if len(entries) == 1 { - return execEventLogInsert(ctx, ex, txn, baseQuery, args, len(entries)) - } - - var additionalRows strings.Builder - for i := 1; i < len(entries); i++ { - var placeholderNum = 1 + (i * colsPerEvent) - if err := prepareRow(ctx, txn, &args, entries[i].event, entries[i].targetID, reportingID); err != nil { - return err - } - additionalRows.WriteString(fmt.Sprintf(", ($%d, $%d, $%d, $%d, $%d)", - placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4)) - } - - rows, err := ex.Exec(ctx, "log-event", txn, baseQuery+additionalRows.String(), args...) - if err != nil { - return err - } - if rows != len(entries) { - return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(entries)) - } - return nil -} - -// skipWritePath is used when either onlyLog is true, or writes to the event log -// table are disabled. In these cases, we do not write to the event log table. -func skipWritePath(ctx context.Context, txn *kv.Txn, entries []eventLogEntry, onlyLog bool) error { + // Finish populating the entries. for i := range entries { - if err := setupEventAndMaybeLog( - ctx, txn, entries[i].event, onlyLog, - ); err != nil { - return err + // Ensure the type field is populated. + event := entries[i].event + eventType := eventpb.GetEventTypeName(event) + event.CommonDetails().EventType = eventType + + // The caller is responsible for the timestamp field. + if event.CommonDetails().Timestamp == 0 { + return errors.AssertionFailedf("programming error: timestamp field in event %d not populated: %T", i, event) } } - return nil -} - -// setupEventAndMaybeLog prepares the event log to be written. Also, -// if onlyLog is true, it will log the event. -func setupEventAndMaybeLog( - ctx context.Context, txn *kv.Txn, info eventpb.EventPayload, onlyLog bool, -) error { - eventType := eventpb.GetEventTypeName(info) - - // Ensure the type field is populated. - info.CommonDetails().EventType = eventType - - // The caller is responsible for the timestamp field. - if info.CommonDetails().Timestamp == 0 { - return errors.AssertionFailedf("programming error: timestamp field in event not populated: %T", info) - } - // If we only want to log and not write to the events table, early exit. - if onlyLog { - log.StructuredEvent(ctx, info) + // If we only want to log externally and not write to the events table, early exit. + loggingToSystemTable := !onlyLog && eventLogEnabled.Get(&ex.s.cfg.Settings.SV) + if !loggingToSystemTable { + // Simply emit the events to their respective channels and call it a day. + if !skipExternalLog { + for i := range entries { + log.StructuredEvent(ctx, entries[i].event) + } + } + // Not writing to system table: shortcut. return nil } - // Ensure that the external logging sees the event when the - // transaction commits. - txn.AddCommitTrigger(func(ctx context.Context) { - log.StructuredEvent(ctx, info) - }) + // When logging to the system table, ensure that the external + // logging only sees the event when the transaction commits. + if !skipExternalLog { + txn.AddCommitTrigger(func(ctx context.Context) { + for i := range entries { + log.StructuredEvent(ctx, entries[i].event) + } + }) + } - return nil -} + // The function below this point is specialized to write to the + // system table. -// constructArgs constructs the values for a single event-log row insert. -func constructArgs( - args *[]interface{}, - event eventpb.EventPayload, - eventType string, - targetID int32, - reportingID int32, -) error { - *args = append( - *args, - timeutil.Unix(0, event.CommonDetails().Timestamp), - eventType, targetID, - reportingID, - ) - var info interface{} - if event != nil { + const colsPerEvent = 5 + const baseQuery = ` +INSERT INTO system.eventlog ( + timestamp, "eventType", "targetID", "reportingID", info +) +VALUES($1, $2, $3, $4, $5)` + args := make([]interface{}, 0, len(entries)*colsPerEvent) + constructArgs := func(reportingID int32, entry eventLogEntry) error { + event := entry.event infoBytes, err := json.Marshal(event) if err != nil { return err } - info = string(infoBytes) + eventType := eventpb.GetEventTypeName(event) + args = append( + args, + timeutil.Unix(0, event.CommonDetails().Timestamp), + eventType, + entry.targetID, + reportingID, + string(infoBytes), + ) + return nil } - *args = append(*args, info) - return nil -} -// execEventLogInsert executes the insert query to insert the new events -// into the event log table. -func execEventLogInsert( - ctx context.Context, - ex *InternalExecutor, - txn *kv.Txn, - query string, - args []interface{}, - numEvents int, -) error { - rows, err := ex.Exec(ctx, "log-event", txn, query, args...) - if err != nil { + // In the common case where we have just 1 event, we want to skeep + // the extra heap allocation and buffer operations of the loop + // below. This is an optimization. + query := baseQuery + if err := constructArgs(reportingID, entries[0]); err != nil { return err } - if rows != numEvents { - return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, numEvents) + if len(entries) > 1 { + // Extend the query with additional VALUES clauses for all the + // events after the first one. + var completeQuery strings.Builder + completeQuery.WriteString(baseQuery) + + for _, extraEntry := range entries[1:] { + placeholderNum := 1 + len(args) + if err := constructArgs(reportingID, extraEntry); err != nil { + return err + } + fmt.Fprintf(&completeQuery, ", ($%d, $%d, $%d, $%d, $%d)", + placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4) + } + query = completeQuery.String() } - return nil -} -// prepareRow creates the values of an insert for a row. It populates the -// event payload with additional info, and then adds the values of the row to args. -func prepareRow( - ctx context.Context, - txn *kv.Txn, - args *[]interface{}, - event eventpb.EventPayload, - targetID int32, - reportingID int32, -) error { - // Setup event log. - eventType := eventpb.GetEventTypeName(event) - if err := setupEventAndMaybeLog( - ctx, txn, event, false, /* onlyLog */ - ); err != nil { + rows, err := ex.Exec(ctx, "log-event", txn, query, args...) + if err != nil { return err } - - // Construct the args for this row. - if err := constructArgs(args, event, eventType, targetID, reportingID); err != nil { - return err + if rows != len(entries) { + return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(entries)) } return nil } From 3be70f18907125422f07e773c9513d03e5ba668a Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 15:55:37 +0200 Subject: [PATCH 3/8] sql: rename the system event log cluster setting This clarifies its purpose. Release note: None --- pkg/sql/event_log.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 4b236129c596..601ceb40ec34 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -223,7 +223,7 @@ func LogEventForJobs( ) } -var eventLogEnabled = settings.RegisterBoolSetting( +var eventLogSystemTableEnabled = settings.RegisterBoolSetting( "server.eventlog.enabled", "if set, logged notable events are also stored in the table system.eventlog", true, @@ -280,7 +280,7 @@ func insertEventRecords( } // If we only want to log externally and not write to the events table, early exit. - loggingToSystemTable := !onlyLog && eventLogEnabled.Get(&ex.s.cfg.Settings.SV) + loggingToSystemTable := !onlyLog && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) if !loggingToSystemTable { // Simply emit the events to their respective channels and call it a day. if !skipExternalLog { From b196d8333991e0868ff72b667c5b0d7eff354820 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 15:58:51 +0200 Subject: [PATCH 4/8] sql: move a function to its right place `logEventsOnlyExternally()` is specific to the code in `exec_log.go` and should thus reside there. This patch achieves that. Release note: None --- pkg/sql/event_log.go | 6 ------ pkg/sql/exec_log.go | 17 +++++++++++------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 601ceb40ec34..45dc6fcec21e 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -59,12 +59,6 @@ func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error return p.logEventsWithSystemEventLogOption(ctx, true /* writeToEventLog */, entries...) } -func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { - // The API contract for logEventWithSystemEventLogOption() is that it returns - // no error when system.eventlog is not written to. - _ = p.logEventsWithSystemEventLogOption(ctx, false /* writeToEventLog */, entries...) -} - // logEventsWithSystemEventLogOption is like logEvent() but it gives // control to the caller as to whether the entry is written into // system.eventlog. diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 5c7474beaf18..dc88ea779680 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -325,14 +325,12 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventsOnlyExternally(ctx, - eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventsOnlyExternally(ctx, - eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) } } @@ -342,11 +340,18 @@ func (p *planner) maybeLogStatementInternal( } if shouldLogToAdminAuditLog { - p.logEventsOnlyExternally(ctx, - eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) } } +func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { + // The API contract for logEventsWithSystemEventLogOption() is that it returns + // no error when system.eventlog is not written to. + _ = p.logEventsWithSystemEventLogOption(ctx, + false, /* writeToEventLog */ + entries...) +} + // maybeAudit marks the current plan being constructed as flagged // for auditing if the table being touched has an auditing mode set. // This is later picked up by maybeLogStatement() above. From 07ab3b827d0c3237f55f66e3d8bd095586353616 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 16:11:30 +0200 Subject: [PATCH 5/8] sql/event_log: avoid boolean parameters The code was previously using two booleans `onlyLog` and `writeToEventLog` which were making the code difficult to understand and to maintain. This patch fixes this by introducing a bitset with descriptive names. Release note: None --- pkg/server/node.go | 7 ++-- pkg/server/server.go | 6 ++-- pkg/sql/create_stats.go | 4 +-- pkg/sql/event_log.go | 77 +++++++++++++++++++++++++++-------------- pkg/sql/exec_log.go | 6 ++-- 5 files changed, 62 insertions(+), 38 deletions(-) diff --git a/pkg/server/node.go b/pkg/server/node.go index 7eaffa16abed..b8bdbc39a79c 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -823,11 +823,10 @@ func (n *Node) recordJoinEvent(ctx context.Context) { if err := n.storeCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return sql.InsertEventRecord(ctx, n.sqlExec, txn, - int32(n.Descriptor.NodeID), - int32(n.Descriptor.NodeID), - true, /* skipExternalLog - we already call log.StructuredEvent above */ + int32(n.Descriptor.NodeID), /* reporting ID: the node where the event is logged */ + sql.LogToSystemTable, /* LogEventDestination: we already call log.StructuredEvent above */ + int32(n.Descriptor.NodeID), /* target ID: the node that is joining (ourselves) */ event, - false, /* onlyLog */ ) }); err != nil { log.Warningf(ctx, "%s: unable to log event %v: %v", n, event, err) diff --git a/pkg/server/server.go b/pkg/server/server.go index 27c544ef2f43..051713895748 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2346,10 +2346,10 @@ func (s *Server) Decommission( ctx, s.sqlServer.execCfg.InternalExecutor, txn, - int32(nodeID), int32(s.NodeID()), - true, /* skipExternalLog - we already call log.StructuredEvent above */ + int32(s.NodeID()), /* reporting ID: the node where the event is logged */ + sql.LogToSystemTable, /* we already call log.StructuredEvent above */ + int32(nodeID), /* target ID: the node that we wee a membership change for */ event, - false, /* onlyLog */ ) }); err != nil { log.Ops.Errorf(ctx, "unable to record event: %+v: %+v", event, err) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 50b28a9ac5d2..11b4126ba052 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -580,8 +580,8 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er evalCtx.SessionData.ApplicationName, details.Statement, "CREATE STATISTICS", - nil, /* no placeholders known at this point */ - true, /* writeToEventLog */ + nil, /* no placeholders known at this point */ + eventLogOptions{dst: LogEverywhere}, eventLogEntry{ targetID: int32(details.Table.ID), event: &eventpb.CreateStatistics{ diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 45dc6fcec21e..0f53669b007a 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -47,7 +47,8 @@ type eventLogEntry struct { func (p *planner) logEvent( ctx context.Context, descID descpb.ID, event eventpb.EventPayload, ) error { - return p.logEventsWithSystemEventLogOption(ctx, true, /* writeToEventLog */ + return p.logEventsWithOptions(ctx, + eventLogOptions{dst: LogEverywhere}, eventLogEntry{targetID: int32(descID), event: event}) } @@ -56,24 +57,31 @@ func (p *planner) logEvent( // that produce multiple events, e.g. GRANT, as they will // processed using only one write batch (and thus lower latency). func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error { - return p.logEventsWithSystemEventLogOption(ctx, true /* writeToEventLog */, entries...) + return p.logEventsWithOptions(ctx, + eventLogOptions{dst: LogEverywhere}, + entries...) } -// logEventsWithSystemEventLogOption is like logEvent() but it gives -// control to the caller as to whether the entry is written into -// system.eventlog. +// eventLogOptions +type eventLogOptions struct { + // Where to emit the log event to. + dst LogEventDestination +} + +// logEventsWithOptions is like logEvent() but it gives control to the +// caller as to where the event is written to. // -// If writeToEventLog is false, this function guarantees that it -// returns no error. -func (p *planner) logEventsWithSystemEventLogOption( - ctx context.Context, writeToEventLog bool, entries ...eventLogEntry, +// If opts.dst does not include LogToSystemTable, this function is +// guaranteed to not return an error. +func (p *planner) logEventsWithOptions( + ctx context.Context, opts eventLogOptions, entries ...eventLogEntry, ) error { user := p.User() stmt := tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations) stmtTag := p.stmt.AST.StatementTag() pl := p.extendedEvalCtx.EvalContext.Placeholders.Values appName := p.SessionData().ApplicationName - return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, user, appName, stmt, stmtTag, pl, writeToEventLog, entries...) + return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, user, appName, stmt, stmtTag, pl, opts, entries...) } // logEventInternalForSchemaChange emits a cluster event in the @@ -102,8 +110,7 @@ func logEventInternalForSchemaChanges( ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ - false, /* skipExternalLog */ - false, /* onlyLog */ + eventLogOptions{dst: LogEverywhere}, eventLogEntry{ targetID: int32(descID), event: event, @@ -128,7 +135,7 @@ func logEventInternalForSQLStatements( stmt string, stmtTag string, placeholders tree.QueryArguments, - writeToEventLog bool, + opts eventLogOptions, entries ...eventLogEntry, ) error { // Inject the common fields into the payload provided by the caller. @@ -143,8 +150,7 @@ func logEventInternalForSQLStatements( return insertEventRecords(ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), - false, /* skipExternalLog */ - !writeToEventLog, /* onlyLog */ + opts, entries..., ) } @@ -211,8 +217,7 @@ func LogEventForJobs( ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ - false, /* skipExternalLog */ - false, /* onlyLog */ + eventLogOptions{dst: LogEverywhere}, eventLogEntry{event: event}, ) } @@ -223,6 +228,27 @@ var eventLogSystemTableEnabled = settings.RegisterBoolSetting( true, ).WithPublic() +// LogEventDestination indicates for InsertEventRecord where the +// event should be directed to. +type LogEventDestination int + +func (d LogEventDestination) hasFlag(f LogEventDestination) bool { + return d&f != 0 +} + +const ( + // LogToSystemTable makes InsertEventRecord write one or more + // entries to the system eventlog table. (This behavior may be + // removed in a later version.) + LogToSystemTable LogEventDestination = 1 << iota + // LogExternally makes InsertEventRecord write the event(s) to the + // external logs. + LogExternally + + // LogEverywhere logs to all the possible outputs. + LogEverywhere LogEventDestination = LogExternally | LogToSystemTable +) + // InsertEventRecord inserts a single event into the event log as part // of the provided transaction, using the provided internal executor. // @@ -231,13 +257,13 @@ func InsertEventRecord( ctx context.Context, ex *InternalExecutor, txn *kv.Txn, - targetID, reportingID int32, - skipExternalLog bool, + reportingID int32, + dst LogEventDestination, + targetID int32, info eventpb.EventPayload, - onlyLog bool, ) error { return insertEventRecords(ctx, ex, txn, reportingID, - skipExternalLog, onlyLog, + eventLogOptions{dst: dst}, eventLogEntry{targetID: targetID, event: info}) } @@ -256,8 +282,7 @@ func insertEventRecords( ex *InternalExecutor, txn *kv.Txn, reportingID int32, - skipExternalLog bool, - onlyLog bool, + opts eventLogOptions, entries ...eventLogEntry, ) error { // Finish populating the entries. @@ -274,10 +299,10 @@ func insertEventRecords( } // If we only want to log externally and not write to the events table, early exit. - loggingToSystemTable := !onlyLog && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) + loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) if !loggingToSystemTable { // Simply emit the events to their respective channels and call it a day. - if !skipExternalLog { + if opts.dst.hasFlag(LogExternally) { for i := range entries { log.StructuredEvent(ctx, entries[i].event) } @@ -288,7 +313,7 @@ func insertEventRecords( // When logging to the system table, ensure that the external // logging only sees the event when the transaction commits. - if !skipExternalLog { + if opts.dst.hasFlag(LogExternally) { txn.AddCommitTrigger(func(ctx context.Context) { for i := range entries { log.StructuredEvent(ctx, entries[i].event) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index dc88ea779680..67145eb2b5de 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -345,10 +345,10 @@ func (p *planner) maybeLogStatementInternal( } func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { - // The API contract for logEventsWithSystemEventLogOption() is that it returns + // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. - _ = p.logEventsWithSystemEventLogOption(ctx, - false, /* writeToEventLog */ + _ = p.logEventsWithOptions(ctx, + eventLogOptions{dst: LogExternally}, entries...) } From 6e0deb5132338a873257074941e0402021060888 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 16:16:22 +0200 Subject: [PATCH 6/8] sql: pass SQL exec details as a struct to the event log functions This further simplifies the internal API. Release note: None --- pkg/sql/create_stats.go | 18 ++++--- pkg/sql/event_log.go | 101 ++++++++++++++++++++-------------------- 2 files changed, 62 insertions(+), 57 deletions(-) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 11b4126ba052..ec0f39cc54ce 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -575,18 +575,22 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er // CREATE STATISTICS statement. // See: https://github.com/cockroachdb/cockroach/issues/57739 return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return logEventInternalForSQLStatements(ctx, evalCtx.ExecCfg, txn, - evalCtx.SessionData.User(), - evalCtx.SessionData.ApplicationName, - details.Statement, - "CREATE STATISTICS", - nil, /* no placeholders known at this point */ + return logEventInternalForSQLStatements(ctx, + evalCtx.ExecCfg, txn, eventLogOptions{dst: LogEverywhere}, + sqlEventCommonExecPayload{ + user: evalCtx.SessionData.User(), + appName: evalCtx.SessionData.ApplicationName, + stmt: details.Statement, + stmtTag: "CREATE STATISTICS", + placeholders: nil, /* no placeholders known at this point */ + }, eventLogEntry{ targetID: int32(details.Table.ID), event: &eventpb.CreateStatistics{ TableName: details.FQTableName, - }}, + }, + }, ) }) } diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 0f53669b007a..185efa58de05 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -76,12 +76,18 @@ type eventLogOptions struct { func (p *planner) logEventsWithOptions( ctx context.Context, opts eventLogOptions, entries ...eventLogEntry, ) error { - user := p.User() - stmt := tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations) - stmtTag := p.stmt.AST.StatementTag() - pl := p.extendedEvalCtx.EvalContext.Placeholders.Values - appName := p.SessionData().ApplicationName - return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, user, appName, stmt, stmtTag, pl, opts, entries...) + commonPayload := sqlEventCommonExecPayload{ + user: p.User(), + stmt: tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations), + stmtTag: p.stmt.AST.StatementTag(), + placeholders: p.extendedEvalCtx.EvalContext.Placeholders.Values, + appName: p.SessionData().ApplicationName, + } + return logEventInternalForSQLStatements(ctx, + p.extendedEvalCtx.ExecCfg, p.txn, + opts, + commonPayload, + entries...) } // logEventInternalForSchemaChange emits a cluster event in the @@ -118,6 +124,16 @@ func logEventInternalForSchemaChanges( ) } +// sqlEventExecPayload contains the statement and session details +// necessary to populate an eventpb.CommonSQLExecDetails. +type sqlEventCommonExecPayload struct { + user security.SQLUsername + stmt string + stmtTag string + placeholders tree.QueryArguments + appName string +} + // logEventInternalForSQLStatements emits a cluster event on behalf of // a SQL statement, when the point where the event is emitted does not // have access to a (*planner) and the current statement metadata. @@ -130,62 +146,47 @@ func logEventInternalForSQLStatements( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, - user security.SQLUsername, - appName string, - stmt string, - stmtTag string, - placeholders tree.QueryArguments, opts eventLogOptions, + commonPayload sqlEventCommonExecPayload, entries ...eventLogEntry, ) error { // Inject the common fields into the payload provided by the caller. + injectCommonFields := func(entry eventLogEntry) error { + event := entry.event + event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime + sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload) + if !ok { + return errors.AssertionFailedf("unknown event type: %T", event) + } + m := sqlCommon.CommonSQLDetails() + m.Statement = commonPayload.stmt + m.Tag = commonPayload.stmtTag + m.ApplicationName = commonPayload.appName + m.User = commonPayload.user.Normalized() + m.DescriptorID = uint32(entry.targetID) + if pls := commonPayload.placeholders; len(pls) > 0 { + m.PlaceholderValues = make([]string, len(pls)) + for idx, val := range pls { + m.PlaceholderValues[idx] = val.String() + } + } + return nil + } + for i := range entries { - if err := injectCommonFields( - txn, entries[i].targetID, user, appName, stmt, stmtTag, placeholders, entries[i].event, - ); err != nil { + if err := injectCommonFields(entries[i]); err != nil { return err } } - return insertEventRecords(ctx, execCfg.InternalExecutor, - txn, - int32(execCfg.NodeID.SQLInstanceID()), - opts, - entries..., + return insertEventRecords(ctx, + execCfg.InternalExecutor, txn, + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + opts, /* eventLogOptions */ + entries..., /* ...eventLogEntry */ ) } -// injectCommonFields injects the common fields into the event payload provided by the caller. -func injectCommonFields( - txn *kv.Txn, - descID int32, - user security.SQLUsername, - appName string, - stmt string, - stmtTag string, - placeholders tree.QueryArguments, - event eventpb.EventPayload, -) error { - event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime - sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload) - if !ok { - return errors.AssertionFailedf("unknown event type: %T", event) - } - m := sqlCommon.CommonSQLDetails() - m.Statement = stmt - m.Tag = stmtTag - m.ApplicationName = appName - m.User = user.Normalized() - m.DescriptorID = uint32(descID) - if len(placeholders) > 0 { - m.PlaceholderValues = make([]string, len(placeholders)) - for idx, val := range placeholders { - m.PlaceholderValues[idx] = val.String() - } - } - return nil -} - // LogEventForJobs emits a cluster event in the context of a job. func LogEventForJobs( ctx context.Context, From 1434d057b11c6a60c9f5a2cc2cbd983ff61f6671 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 16:19:36 +0200 Subject: [PATCH 7/8] sql/event_log: clarify the API in an explanatory comment This patch describes `event_log.go` at a high level: an overall event refinement pipeline with a straightforward control flow. Release note: None --- pkg/sql/event_log.go | 87 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 185efa58de05..ec1c2633f08d 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -30,6 +30,93 @@ import ( "github.com/cockroachdb/errors" ) +// The logging functions in this file are the different stages of a +// pipeline that add more and more information to logging events until +// they are ready to be sent to either external sinks or to a system +// table. +// +// The overall structure of this pipeline is as follows: +// +// regular statement execution +// for "special" statements that +// have structured logging, e.g. CREATE, DROP etc +// | +// (produces pair(s) of descID (optional) and eventpb.EventPayload) +// | +// (pair(s) optionally packaged as an eventLogEntry{}) +// | +// v +// logEvent(descID, payload) / logEvents(eventEntries...) +// | +// | ,------- query logging in exec_log.go +// | / optionally via logEventOnlyExternally() +// | / +// v v +// logEventsWithOptions() +// | +// (extracts SQL exec details +// from execution context - see sqlEventCommonExecPayload) +// | +// | ,----------- async CREATE STATS +// | / goroutine +// | / on behalf of CREATE STATS stmt +// v v +// logEventInternalForSQLStatements() +// | (SQL exec details struct +// | and main event struct provided +// | separately as arguments) +// | +// (writes the exec details +// inside the event struct) +// | +// | ,----- job execution, at end +// | | +// | LogEventForJobs() +// | | +// | (add job ID, +// | + fields from job metadata +// | timestamp initialized at job txn read ts) +// | | +// | ,-----------------' +// | / +// | / ,------- async schema change +// | | | execution, at end +// | | v +// | | logEventInternalForSchemaChanges() +// | | | +// | | (add mutation ID, +// | | + fields from sc.change metadata +// | | timestamp initialized to txn read ts) +// | | | +// | | ,-------------' +// | | / +// | | / +// (TargetID argument = ID of descriptor affected if DDL, +// otherwise zero) +// | | | +// | | | ,-------- node-level events outside of SQL +// | | | / (e.g. cluster membership) +// | | | / TargetID = ID of node affected +// v v v v +// (expectation: per-type event structs +// fully populated at this point. +// Timestamp field must be set too.) +// | +// v +// InsertEventRecord() / insertEventRecords() +// | +// (finalize field EventType from struct type) +// | +// (route) +// | +// +--> system.eventlog if not disabled by setting +// | +// +--> DEV channel if requested by log.V +// | +// `--> external sinks (via logging package) +// +// + // eventLogEntry represents a SQL-level event to be sent to logging // outputs(s). type eventLogEntry struct { From 5d96260cad5bf0ad214b81b02067f149f7b52b8d Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 May 2021 16:22:53 +0200 Subject: [PATCH 8/8] sql/event_log: fix bug with verbose logging This patch fixes a bug introduced when query logging started using structured events: the ability to automatically copy all the execution events to the DEV channel when setting the `vmodule` setting to `exec_log=2` (or above). In addition to fixing that bug, the following new vmodule-based abilities are added: - events for DDL statements and others that call `logEvent()` can now be collected in the DEV channel by using the name of the source file where they were generated as filter (e.g. `vmodule=create_table=2` for the CREATE TABLE events. - events of other kinds can be collected in the DEV channel by setting `vmodule=event_log=2`. (Note a subtle difference between `vmodule=create_table=2` and `vmodule=exec_log=2`: the former emits the event to the DEV channel while the stmt is executed; the latter emits the event after the stmt completes. If both are enabled, TWO events are sent to the DEV channel.) Since all the vmodule filtering options are subject to change without notice between versions, we do not wish to document these nuances. For this reason, the release note below is left blank. Release note: None --- pkg/server/node.go | 4 +-- pkg/server/server.go | 6 ++--- pkg/sql/create_stats.go | 1 + pkg/sql/event_log.go | 55 +++++++++++++++++++++++++++++++++++++++-- pkg/sql/exec_log.go | 20 +++++++++------ 5 files changed, 72 insertions(+), 14 deletions(-) diff --git a/pkg/server/node.go b/pkg/server/node.go index b8bdbc39a79c..2d6a8086e046 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -824,8 +824,8 @@ func (n *Node) recordJoinEvent(ctx context.Context) { return sql.InsertEventRecord(ctx, n.sqlExec, txn, int32(n.Descriptor.NodeID), /* reporting ID: the node where the event is logged */ - sql.LogToSystemTable, /* LogEventDestination: we already call log.StructuredEvent above */ - int32(n.Descriptor.NodeID), /* target ID: the node that is joining (ourselves) */ + sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* LogEventDestination: we already call log.StructuredEvent above */ + int32(n.Descriptor.NodeID), /* target ID: the node that is joining (ourselves) */ event, ) }); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 051713895748..2ba695f17f0c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2346,9 +2346,9 @@ func (s *Server) Decommission( ctx, s.sqlServer.execCfg.InternalExecutor, txn, - int32(s.NodeID()), /* reporting ID: the node where the event is logged */ - sql.LogToSystemTable, /* we already call log.StructuredEvent above */ - int32(nodeID), /* target ID: the node that we wee a membership change for */ + int32(s.NodeID()), /* reporting ID: the node where the event is logged */ + sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* we already call log.StructuredEvent above */ + int32(nodeID), /* target ID: the node that we wee a membership change for */ event, ) }); err != nil { diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index ec0f39cc54ce..d7b5dbb49b6f 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -577,6 +577,7 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return logEventInternalForSQLStatements(ctx, evalCtx.ExecCfg, txn, + 0, /* depth: use event_log=2 for vmodule filtering */ eventLogOptions{dst: LogEverywhere}, sqlEventCommonExecPayload{ user: evalCtx.SessionData.User(), diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index ec1c2633f08d..214a733af23e 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -135,6 +135,7 @@ func (p *planner) logEvent( ctx context.Context, descID descpb.ID, event eventpb.EventPayload, ) error { return p.logEventsWithOptions(ctx, + 2, /* depth: use caller location */ eventLogOptions{dst: LogEverywhere}, eventLogEntry{targetID: int32(descID), event: event}) } @@ -145,6 +146,7 @@ func (p *planner) logEvent( // processed using only one write batch (and thus lower latency). func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error { return p.logEventsWithOptions(ctx, + 2, /* depth: use caller location */ eventLogOptions{dst: LogEverywhere}, entries...) } @@ -153,6 +155,15 @@ func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error type eventLogOptions struct { // Where to emit the log event to. dst LogEventDestination + + // By default, a copy of each structured event is sent to the DEV + // channel (in addition to its default, nominal channel) if the + // vmodule filter is set to 2 or higher for the source file where + // the event call originates. + // + // If verboseTraceLevel is non-zero, its value is used as value for + // the vmodule filter. See exec_log for an example use. + verboseTraceLevel log.Level } // logEventsWithOptions is like logEvent() but it gives control to the @@ -161,7 +172,7 @@ type eventLogOptions struct { // If opts.dst does not include LogToSystemTable, this function is // guaranteed to not return an error. func (p *planner) logEventsWithOptions( - ctx context.Context, opts eventLogOptions, entries ...eventLogEntry, + ctx context.Context, depth int, opts eventLogOptions, entries ...eventLogEntry, ) error { commonPayload := sqlEventCommonExecPayload{ user: p.User(), @@ -172,6 +183,7 @@ func (p *planner) logEventsWithOptions( } return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, + 1+depth, opts, commonPayload, entries...) @@ -199,10 +211,15 @@ func logEventInternalForSchemaChanges( m.MutationID = uint32(mutationID) // Delegate the storing of the event to the regular event logic. + // + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1, /* depth: use this function as origin */ eventLogOptions{dst: LogEverywhere}, eventLogEntry{ targetID: int32(descID), @@ -233,6 +250,7 @@ func logEventInternalForSQLStatements( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, + depth int, opts eventLogOptions, commonPayload sqlEventCommonExecPayload, entries ...eventLogEntry, @@ -269,6 +287,7 @@ func logEventInternalForSQLStatements( return insertEventRecords(ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1+depth, /* depth */ opts, /* eventLogOptions */ entries..., /* ...eventLogEntry */ ) @@ -301,10 +320,15 @@ func LogEventForJobs( m.Description = payload.Description // Delegate the storing of the event to the regular event logic. + // + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( ctx, execCfg.InternalExecutor, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1, /* depth: use this function for vmodule filtering */ eventLogOptions{dst: LogEverywhere}, eventLogEntry{event: event}, ) @@ -332,9 +356,13 @@ const ( // LogExternally makes InsertEventRecord write the event(s) to the // external logs. LogExternally + // LogToDevChannelIfVerbose makes InsertEventRecord copy + // the structured event to the DEV logging channel + // if the vmodule filter for the log call is set high enough. + LogToDevChannelIfVerbose // LogEverywhere logs to all the possible outputs. - LogEverywhere LogEventDestination = LogExternally | LogToSystemTable + LogEverywhere LogEventDestination = LogExternally | LogToSystemTable | LogToDevChannelIfVerbose ) // InsertEventRecord inserts a single event into the event log as part @@ -350,7 +378,11 @@ func InsertEventRecord( targetID int32, info eventpb.EventPayload, ) error { + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords(ctx, ex, txn, reportingID, + 1, /* depth: use this function */ eventLogOptions{dst: dst}, eventLogEntry{targetID: targetID, event: info}) } @@ -370,6 +402,7 @@ func insertEventRecords( ex *InternalExecutor, txn *kv.Txn, reportingID int32, + depth int, opts eventLogOptions, entries ...eventLogEntry, ) error { @@ -386,6 +419,24 @@ func insertEventRecords( } } + if opts.dst.hasFlag(LogToDevChannelIfVerbose) { + // Emit a copy of the structured to the DEV channel when the + // vmodule setting matches. + level := log.Level(2) + if opts.verboseTraceLevel != 0 { + // Caller has overridden the level at which which log to the + // trace. + level = opts.verboseTraceLevel + } + if log.VDepth(level, depth) { + // The VDepth() call ensures that we are matching the vmodule + // setting to where the depth is equal to 1 in the caller stack. + for i := range entries { + log.InfofDepth(ctx, depth, "SQL event: target %d, payload %+v", entries[i].targetID, entries[i].event) + } + } + } + // If we only want to log externally and not write to the events table, early exit. loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) if !loggingToSystemTable { diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 67145eb2b5de..f95b2733ba02 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -276,11 +276,6 @@ func (p *planner) maybeLogStatementInternal( TxnCounter: uint32(txnCounter), } - if logV { - // Copy to the debug log / trace. - log.VEventf(ctx, execType.vLevel(), "%+v", execDetails) - } - if auditEventsDetected { // TODO(knz): re-add the placeholders and age into the logging event. entries := make([]eventLogEntry, len(p.curPlan.auditEvents)) @@ -334,8 +329,18 @@ func (p *planner) maybeLogStatementInternal( } } - if logExecuteEnabled { - p.logEventsOnlyExternally(ctx, + if logExecuteEnabled || logV { + // The API contract for logEventsWithOptions() is that it returns + // no error when system.eventlog is not written to. + _ = p.logEventsWithOptions(ctx, + 1, /* depth */ + eventLogOptions{ + // We pass LogToDevChannelIfVerbose because we have a log.V + // request for this file, which means the operator wants to + // see a copy of the execution on the DEV Channel. + dst: LogExternally | LogToDevChannelIfVerbose, + verboseTraceLevel: execType.vLevel(), + }, eventLogEntry{event: &eventpb.QueryExecute{CommonSQLExecDetails: execDetails}}) } @@ -348,6 +353,7 @@ func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventL // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, + 2, /* depth: we want to use the caller location */ eventLogOptions{dst: LogExternally}, entries...) }