diff --git a/pkg/server/node.go b/pkg/server/node.go index 7eaffa16abed..2d6a8086e046 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -823,11 +823,10 @@ func (n *Node) recordJoinEvent(ctx context.Context) { if err := n.storeCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return sql.InsertEventRecord(ctx, n.sqlExec, txn, - int32(n.Descriptor.NodeID), - int32(n.Descriptor.NodeID), - true, /* skipExternalLog - we already call log.StructuredEvent above */ + int32(n.Descriptor.NodeID), /* reporting ID: the node where the event is logged */ + sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* LogEventDestination: we already call log.StructuredEvent above */ + int32(n.Descriptor.NodeID), /* target ID: the node that is joining (ourselves) */ event, - false, /* onlyLog */ ) }); err != nil { log.Warningf(ctx, "%s: unable to log event %v: %v", n, event, err) diff --git a/pkg/server/server.go b/pkg/server/server.go index 27c544ef2f43..2ba695f17f0c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2346,10 +2346,10 @@ func (s *Server) Decommission( ctx, s.sqlServer.execCfg.InternalExecutor, txn, - int32(nodeID), int32(s.NodeID()), - true, /* skipExternalLog - we already call log.StructuredEvent above */ + int32(s.NodeID()), /* reporting ID: the node where the event is logged */ + sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* we already call log.StructuredEvent above */ + int32(nodeID), /* target ID: the node that we wee a membership change for */ event, - false, /* onlyLog */ ) }); err != nil { log.Ops.Errorf(ctx, "unable to record event: %+v: %+v", event, err) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 00b030728aaa..d7b5dbb49b6f 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -575,16 +575,22 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er // CREATE STATISTICS statement. // See: https://github.com/cockroachdb/cockroach/issues/57739 return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return logEventInternalForSQLStatements(ctx, evalCtx.ExecCfg, txn, - descpb.IDs{details.Table.ID}, - evalCtx.SessionData.User(), - evalCtx.SessionData.ApplicationName, - details.Statement, - "CREATE STATISTICS", - nil, /* no placeholders known at this point */ - true, /* writeToEventLog */ - &eventpb.CreateStatistics{ - TableName: details.FQTableName, + return logEventInternalForSQLStatements(ctx, + evalCtx.ExecCfg, txn, + 0, /* depth: use event_log=2 for vmodule filtering */ + eventLogOptions{dst: LogEverywhere}, + sqlEventCommonExecPayload{ + user: evalCtx.SessionData.User(), + appName: evalCtx.SessionData.ApplicationName, + stmt: details.Statement, + stmtTag: "CREATE STATISTICS", + placeholders: nil, /* no placeholders known at this point */ + }, + eventLogEntry{ + targetID: int32(details.Table.ID), + event: &eventpb.CreateStatistics{ + TableName: details.FQTableName, + }, }, ) }) diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 74e60b7a9e21..214a733af23e 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -30,45 +30,163 @@ import ( "github.com/cockroachdb/errors" ) +// The logging functions in this file are the different stages of a +// pipeline that add more and more information to logging events until +// they are ready to be sent to either external sinks or to a system +// table. +// +// The overall structure of this pipeline is as follows: +// +// regular statement execution +// for "special" statements that +// have structured logging, e.g. CREATE, DROP etc +// | +// (produces pair(s) of descID (optional) and eventpb.EventPayload) +// | +// (pair(s) optionally packaged as an eventLogEntry{}) +// | +// v +// logEvent(descID, payload) / logEvents(eventEntries...) +// | +// | ,------- query logging in exec_log.go +// | / optionally via logEventOnlyExternally() +// | / +// v v +// logEventsWithOptions() +// | +// (extracts SQL exec details +// from execution context - see sqlEventCommonExecPayload) +// | +// | ,----------- async CREATE STATS +// | / goroutine +// | / on behalf of CREATE STATS stmt +// v v +// logEventInternalForSQLStatements() +// | (SQL exec details struct +// | and main event struct provided +// | separately as arguments) +// | +// (writes the exec details +// inside the event struct) +// | +// | ,----- job execution, at end +// | | +// | LogEventForJobs() +// | | +// | (add job ID, +// | + fields from job metadata +// | timestamp initialized at job txn read ts) +// | | +// | ,-----------------' +// | / +// | / ,------- async schema change +// | | | execution, at end +// | | v +// | | logEventInternalForSchemaChanges() +// | | | +// | | (add mutation ID, +// | | + fields from sc.change metadata +// | | timestamp initialized to txn read ts) +// | | | +// | | ,-------------' +// | | / +// | | / +// (TargetID argument = ID of descriptor affected if DDL, +// otherwise zero) +// | | | +// | | | ,-------- node-level events outside of SQL +// | | | / (e.g. cluster membership) +// | | | / TargetID = ID of node affected +// v v v v +// (expectation: per-type event structs +// fully populated at this point. +// Timestamp field must be set too.) +// | +// v +// InsertEventRecord() / insertEventRecords() +// | +// (finalize field EventType from struct type) +// | +// (route) +// | +// +--> system.eventlog if not disabled by setting +// | +// +--> DEV channel if requested by log.V +// | +// `--> external sinks (via logging package) +// +// + +// eventLogEntry represents a SQL-level event to be sent to logging +// outputs(s). +type eventLogEntry struct { + // targetID is the main object affected by this event. + // For DDL statements, this is typically the ID of + // the affected descriptor. + targetID int32 + + // event is the main event payload. + event eventpb.EventPayload +} + // logEvent emits a cluster event in the context of a regular SQL // statement. func (p *planner) logEvent( - ctx context.Context, descID descpb.ID, events eventpb.EventPayload, + ctx context.Context, descID descpb.ID, event eventpb.EventPayload, ) error { - return p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, true /* writeToEventLog */, events) + return p.logEventsWithOptions(ctx, + 2, /* depth: use caller location */ + eventLogOptions{dst: LogEverywhere}, + eventLogEntry{targetID: int32(descID), event: event}) } -// batchLogEvents is like logEvent, except it takes in slice of events -// to batch write. -func (p *planner) batchLogEvents( - ctx context.Context, descIDs descpb.IDs, events ...eventpb.EventPayload, -) error { - return p.logEventsWithSystemEventLogOption(ctx, descIDs, true /* writeToEventLog */, events...) +// logEvents is like logEvent, except that it can write multiple +// events simultaneously. This is advantageous for SQL statements +// that produce multiple events, e.g. GRANT, as they will +// processed using only one write batch (and thus lower latency). +func (p *planner) logEvents(ctx context.Context, entries ...eventLogEntry) error { + return p.logEventsWithOptions(ctx, + 2, /* depth: use caller location */ + eventLogOptions{dst: LogEverywhere}, + entries...) } -func (p *planner) logEventOnlyExternally( - ctx context.Context, descID descpb.ID, event eventpb.EventPayload, -) { - // The API contract for logEventWithSystemEventLogOption() is that it returns - // no error when system.eventlog is not written to. - _ = p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, false /* writeToEventLog */, event) +// eventLogOptions +type eventLogOptions struct { + // Where to emit the log event to. + dst LogEventDestination + + // By default, a copy of each structured event is sent to the DEV + // channel (in addition to its default, nominal channel) if the + // vmodule filter is set to 2 or higher for the source file where + // the event call originates. + // + // If verboseTraceLevel is non-zero, its value is used as value for + // the vmodule filter. See exec_log for an example use. + verboseTraceLevel log.Level } -// logEventsWithSystemEventLogOption is like logEvent() but it gives -// control to the caller as to whether the entry is written into -// system.eventlog. +// logEventsWithOptions is like logEvent() but it gives control to the +// caller as to where the event is written to. // -// If writeToEventLog is false, this function guarantees that it -// returns no error. -func (p *planner) logEventsWithSystemEventLogOption( - ctx context.Context, descIDs descpb.IDs, writeToEventLog bool, events ...eventpb.EventPayload, +// If opts.dst does not include LogToSystemTable, this function is +// guaranteed to not return an error. +func (p *planner) logEventsWithOptions( + ctx context.Context, depth int, opts eventLogOptions, entries ...eventLogEntry, ) error { - user := p.User() - stmt := tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations) - stmtTag := p.stmt.AST.StatementTag() - pl := p.extendedEvalCtx.EvalContext.Placeholders.Values - appName := p.SessionData().ApplicationName - return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, descIDs, user, appName, stmt, stmtTag, pl, writeToEventLog, events...) + commonPayload := sqlEventCommonExecPayload{ + user: p.User(), + stmt: tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations), + stmtTag: p.stmt.AST.StatementTag(), + placeholders: p.extendedEvalCtx.EvalContext.Placeholders.Values, + appName: p.SessionData().ApplicationName, + } + return logEventInternalForSQLStatements(ctx, + p.extendedEvalCtx.ExecCfg, p.txn, + 1+depth, + opts, + commonPayload, + entries...) } // logEventInternalForSchemaChange emits a cluster event in the @@ -93,17 +211,33 @@ func logEventInternalForSchemaChanges( m.MutationID = uint32(mutationID) // Delegate the storing of the event to the regular event logic. - return InsertEventRecord( + // + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. + return insertEventRecords( ctx, execCfg.InternalExecutor, txn, - int32(descID), - int32(execCfg.NodeID.SQLInstanceID()), - false, /* skipExternalLog */ - event, - false, /* onlyLog */ + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1, /* depth: use this function as origin */ + eventLogOptions{dst: LogEverywhere}, + eventLogEntry{ + targetID: int32(descID), + event: event, + }, ) } +// sqlEventExecPayload contains the statement and session details +// necessary to populate an eventpb.CommonSQLExecDetails. +type sqlEventCommonExecPayload struct { + user security.SQLUsername + stmt string + stmtTag string + placeholders tree.QueryArguments + appName string +} + // logEventInternalForSQLStatements emits a cluster event on behalf of // a SQL statement, when the point where the event is emitted does not // have access to a (*planner) and the current statement metadata. @@ -116,68 +250,49 @@ func logEventInternalForSQLStatements( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, - descIDs descpb.IDs, - user security.SQLUsername, - appName string, - stmt string, - stmtTag string, - placeholders tree.QueryArguments, - writeToEventLog bool, - events ...eventpb.EventPayload, + depth int, + opts eventLogOptions, + commonPayload sqlEventCommonExecPayload, + entries ...eventLogEntry, ) error { // Inject the common fields into the payload provided by the caller. - for i := range events { - if err := injectCommonFields( - txn, descIDs[i], user, appName, stmt, stmtTag, placeholders, events[i], - ); err != nil { - return err + injectCommonFields := func(entry eventLogEntry) error { + event := entry.event + event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime + sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload) + if !ok { + return errors.AssertionFailedf("unknown event type: %T", event) } + m := sqlCommon.CommonSQLDetails() + m.Statement = commonPayload.stmt + m.Tag = commonPayload.stmtTag + m.ApplicationName = commonPayload.appName + m.User = commonPayload.user.Normalized() + m.DescriptorID = uint32(entry.targetID) + if pls := commonPayload.placeholders; len(pls) > 0 { + m.PlaceholderValues = make([]string, len(pls)) + for idx, val := range pls { + m.PlaceholderValues[idx] = val.String() + } + } + return nil } - // Delegate the storing of the event to the regular event logic. - if !writeToEventLog || !eventLogEnabled.Get(&execCfg.InternalExecutor.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, events, !writeToEventLog) + for i := range entries { + if err := injectCommonFields(entries[i]); err != nil { + return err + } } - return batchInsertEventRecords(ctx, execCfg.InternalExecutor, - txn, - descIDs, - int32(execCfg.NodeID.SQLInstanceID()), - events..., + return insertEventRecords(ctx, + execCfg.InternalExecutor, txn, + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1+depth, /* depth */ + opts, /* eventLogOptions */ + entries..., /* ...eventLogEntry */ ) } -// injectCommonFields injects the common fields into the event payload provided by the caller. -func injectCommonFields( - txn *kv.Txn, - descID descpb.ID, - user security.SQLUsername, - appName string, - stmt string, - stmtTag string, - placeholders tree.QueryArguments, - event eventpb.EventPayload, -) error { - event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime - sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload) - if !ok { - return errors.AssertionFailedf("unknown event type: %T", event) - } - m := sqlCommon.CommonSQLDetails() - m.Statement = stmt - m.Tag = stmtTag - m.ApplicationName = appName - m.User = user.Normalized() - m.DescriptorID = uint32(descID) - if len(placeholders) > 0 { - m.PlaceholderValues = make([]string, len(placeholders)) - for idx, val := range placeholders { - m.PlaceholderValues[idx] = val.String() - } - } - return nil -} - // LogEventForJobs emits a cluster event in the context of a job. func LogEventForJobs( ctx context.Context, @@ -205,220 +320,204 @@ func LogEventForJobs( m.Description = payload.Description // Delegate the storing of the event to the regular event logic. - return InsertEventRecord( + // + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. + return insertEventRecords( ctx, execCfg.InternalExecutor, txn, - 0, /* targetID */ - int32(execCfg.NodeID.SQLInstanceID()), - false, /* skipExternalLog */ - event, - false, /* onlyLog */ + int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ + 1, /* depth: use this function for vmodule filtering */ + eventLogOptions{dst: LogEverywhere}, + eventLogEntry{event: event}, ) } -var eventLogEnabled = settings.RegisterBoolSetting( +var eventLogSystemTableEnabled = settings.RegisterBoolSetting( "server.eventlog.enabled", "if set, logged notable events are also stored in the table system.eventlog", true, ).WithPublic() +// LogEventDestination indicates for InsertEventRecord where the +// event should be directed to. +type LogEventDestination int + +func (d LogEventDestination) hasFlag(f LogEventDestination) bool { + return d&f != 0 +} + +const ( + // LogToSystemTable makes InsertEventRecord write one or more + // entries to the system eventlog table. (This behavior may be + // removed in a later version.) + LogToSystemTable LogEventDestination = 1 << iota + // LogExternally makes InsertEventRecord write the event(s) to the + // external logs. + LogExternally + // LogToDevChannelIfVerbose makes InsertEventRecord copy + // the structured event to the DEV logging channel + // if the vmodule filter for the log call is set high enough. + LogToDevChannelIfVerbose + + // LogEverywhere logs to all the possible outputs. + LogEverywhere LogEventDestination = LogExternally | LogToSystemTable | LogToDevChannelIfVerbose +) + // InsertEventRecord inserts a single event into the event log as part // of the provided transaction, using the provided internal executor. // -// The caller is responsible for populating the timestamp field -// in the event payload. -// -// If the skipExternalLog bool is set, this function does not call -// log.StructuredEvent(). In that case, the caller is responsible for -// calling log.StructuredEvent() directly. -// -// Note: the targetID and reportingID columns are deprecated and -// should be removed after v21.1 is released. -// -// If onlyLog is set, this function guarantees that it returns no -// error. +// This converts to a call to insertEventRecords() with just 1 entry. func InsertEventRecord( ctx context.Context, ex *InternalExecutor, txn *kv.Txn, - targetID, reportingID int32, - skipExternalLog bool, + reportingID int32, + dst LogEventDestination, + targetID int32, info eventpb.EventPayload, - onlyLog bool, ) error { - if onlyLog || !eventLogEnabled.Get(&ex.s.cfg.Settings.SV) { - return skipWritePath(ctx, txn, []eventpb.EventPayload{info}, onlyLog) - } - return batchInsertEventRecords( - ctx, ex, txn, - descpb.IDs{descpb.ID(targetID)}, - reportingID, - info, - ) + // We use depth=1 because the caller of this function typically + // wraps the call in a db.Txn() callback, which confuses the vmodule + // filtering. Easiest is to pretend the event is sourced here. + return insertEventRecords(ctx, ex, txn, reportingID, + 1, /* depth: use this function */ + eventLogOptions{dst: dst}, + eventLogEntry{targetID: targetID, event: info}) } -// batchInsertEventRecords is like InsertEventRecord except it takes -// a slice of events to batch write. Any insert that calls this function -// will always write to the event table (i.e. it won't only log them, and writing -// to the event table will not be disabled). -func batchInsertEventRecords( +// insertEventRecords inserts one or more event into the event log as +// part of the provided txn, using the provided internal executor. +// +// The caller is responsible for populating the timestamp field in the +// event payload and all the other per-payload specific fields. This +// function only takes care of populating the EventType field based on +// the run-time type of the event payload. +// +// Note: the targetID and reportingID columns are deprecated and +// should be removed after v21.1 is released. +func insertEventRecords( ctx context.Context, ex *InternalExecutor, txn *kv.Txn, - descIDs descpb.IDs, reportingID int32, - events ...eventpb.EventPayload, + depth int, + opts eventLogOptions, + entries ...eventLogEntry, ) error { - const colsPerEvent = 5 - const baseQuery = ` -INSERT INTO system.eventlog ( - timestamp, "eventType", "targetID", "reportingID", info -) -VALUES($1, $2, $3, $4, $5)` - args := make([]interface{}, 0, len(events)*colsPerEvent) - - // Prepare first row so we can take the fast path if we're only inserting one event log. - if err := prepareRow( - ctx, txn, &args, events[0], descIDs[0], reportingID, - ); err != nil { - return err - } - if len(events) == 1 { - return execEventLogInsert(ctx, ex, txn, baseQuery, args, len(events)) - } - - var additionalRows strings.Builder - for i := 1; i < len(events); i++ { - var placeholderNum = 1 + (i * colsPerEvent) - if err := prepareRow(ctx, txn, &args, events[i], descIDs[i], reportingID); err != nil { - return err + // Finish populating the entries. + for i := range entries { + // Ensure the type field is populated. + event := entries[i].event + eventType := eventpb.GetEventTypeName(event) + event.CommonDetails().EventType = eventType + + // The caller is responsible for the timestamp field. + if event.CommonDetails().Timestamp == 0 { + return errors.AssertionFailedf("programming error: timestamp field in event %d not populated: %T", i, event) } - additionalRows.WriteString(fmt.Sprintf(", ($%d, $%d, $%d, $%d, $%d)", - placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4)) - } - - rows, err := ex.Exec(ctx, "log-event", txn, baseQuery+additionalRows.String(), args...) - if err != nil { - return err - } - if rows != len(events) { - return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(events)) } - return nil -} -// skipWritePath is used when either onlyLog is true, or writes to the event log -// table are disabled. In these cases, we do not write to the event log table. -func skipWritePath( - ctx context.Context, txn *kv.Txn, events []eventpb.EventPayload, onlyLog bool, -) error { - for i := range events { - if err := setupEventAndMaybeLog( - ctx, txn, events[i], onlyLog, - ); err != nil { - return err + if opts.dst.hasFlag(LogToDevChannelIfVerbose) { + // Emit a copy of the structured to the DEV channel when the + // vmodule setting matches. + level := log.Level(2) + if opts.verboseTraceLevel != 0 { + // Caller has overridden the level at which which log to the + // trace. + level = opts.verboseTraceLevel + } + if log.VDepth(level, depth) { + // The VDepth() call ensures that we are matching the vmodule + // setting to where the depth is equal to 1 in the caller stack. + for i := range entries { + log.InfofDepth(ctx, depth, "SQL event: target %d, payload %+v", entries[i].targetID, entries[i].event) + } } - } - return nil -} - -// setupEventAndMaybeLog prepares the event log to be written. Also, -// if onlyLog is true, it will log the event. -func setupEventAndMaybeLog( - ctx context.Context, txn *kv.Txn, info eventpb.EventPayload, onlyLog bool, -) error { - eventType := eventpb.GetEventTypeName(info) - - // Ensure the type field is populated. - info.CommonDetails().EventType = eventType - - // The caller is responsible for the timestamp field. - if info.CommonDetails().Timestamp == 0 { - return errors.AssertionFailedf("programming error: timestamp field in event not populated: %T", info) } - // If we only want to log and not write to the events table, early exit. - if onlyLog { - log.StructuredEvent(ctx, info) + // If we only want to log externally and not write to the events table, early exit. + loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&ex.s.cfg.Settings.SV) + if !loggingToSystemTable { + // Simply emit the events to their respective channels and call it a day. + if opts.dst.hasFlag(LogExternally) { + for i := range entries { + log.StructuredEvent(ctx, entries[i].event) + } + } + // Not writing to system table: shortcut. return nil } - // Ensure that the external logging sees the event when the - // transaction commits. - txn.AddCommitTrigger(func(ctx context.Context) { - log.StructuredEvent(ctx, info) - }) + // When logging to the system table, ensure that the external + // logging only sees the event when the transaction commits. + if opts.dst.hasFlag(LogExternally) { + txn.AddCommitTrigger(func(ctx context.Context) { + for i := range entries { + log.StructuredEvent(ctx, entries[i].event) + } + }) + } - return nil -} + // The function below this point is specialized to write to the + // system table. -// constructArgs constructs the values for a single event-log row insert. -func constructArgs( - args *[]interface{}, - event eventpb.EventPayload, - eventType string, - descID descpb.ID, - reportingID int32, -) error { - *args = append( - *args, - timeutil.Unix(0, event.CommonDetails().Timestamp), - eventType, int32(descID), - reportingID, - ) - var info interface{} - if event != nil { + const colsPerEvent = 5 + const baseQuery = ` +INSERT INTO system.eventlog ( + timestamp, "eventType", "targetID", "reportingID", info +) +VALUES($1, $2, $3, $4, $5)` + args := make([]interface{}, 0, len(entries)*colsPerEvent) + constructArgs := func(reportingID int32, entry eventLogEntry) error { + event := entry.event infoBytes, err := json.Marshal(event) if err != nil { return err } - info = string(infoBytes) + eventType := eventpb.GetEventTypeName(event) + args = append( + args, + timeutil.Unix(0, event.CommonDetails().Timestamp), + eventType, + entry.targetID, + reportingID, + string(infoBytes), + ) + return nil } - *args = append(*args, info) - return nil -} -// execEventLogInsert executes the insert query to insert the new events -// into the event log table. -func execEventLogInsert( - ctx context.Context, - ex *InternalExecutor, - txn *kv.Txn, - query string, - args []interface{}, - numEvents int, -) error { - rows, err := ex.Exec(ctx, "log-event", txn, query, args...) - if err != nil { + // In the common case where we have just 1 event, we want to skeep + // the extra heap allocation and buffer operations of the loop + // below. This is an optimization. + query := baseQuery + if err := constructArgs(reportingID, entries[0]); err != nil { return err } - if rows != numEvents { - return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, numEvents) + if len(entries) > 1 { + // Extend the query with additional VALUES clauses for all the + // events after the first one. + var completeQuery strings.Builder + completeQuery.WriteString(baseQuery) + + for _, extraEntry := range entries[1:] { + placeholderNum := 1 + len(args) + if err := constructArgs(reportingID, extraEntry); err != nil { + return err + } + fmt.Fprintf(&completeQuery, ", ($%d, $%d, $%d, $%d, $%d)", + placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4) + } + query = completeQuery.String() } - return nil -} -// prepareRow creates the values of an insert for a row. It populates the -// event payload with additional info, and then adds the values of the row to args. -func prepareRow( - ctx context.Context, - txn *kv.Txn, - args *[]interface{}, - event eventpb.EventPayload, - descID descpb.ID, - reportingID int32, -) error { - // Setup event log. - eventType := eventpb.GetEventTypeName(event) - if err := setupEventAndMaybeLog( - ctx, txn, event, false, /* onlyLog */ - ); err != nil { + rows, err := ex.Exec(ctx, "log-event", txn, query, args...) + if err != nil { return err } - - // Construct the args for this row. - if err := constructArgs(args, event, eventType, descID, reportingID); err != nil { - return err + if rows != len(entries) { + return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(entries)) } return nil } diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 75a3a3cf62f3..f95b2733ba02 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -276,14 +276,10 @@ func (p *planner) maybeLogStatementInternal( TxnCounter: uint32(txnCounter), } - if logV { - // Copy to the debug log / trace. - log.VEventf(ctx, execType.vLevel(), "%+v", execDetails) - } - if auditEventsDetected { // TODO(knz): re-add the placeholders and age into the logging event. - for _, ev := range p.curPlan.auditEvents { + entries := make([]eventLogEntry, len(p.curPlan.auditEvents)) + for i, ev := range p.curPlan.auditEvents { mode := "r" if ev.writing { mode = "rw" @@ -303,15 +299,18 @@ func (p *planner) maybeLogStatementInternal( tableName = tn.FQString() } } - - p.logEventOnlyExternally(ctx, ev.desc.GetID(), - &eventpb.SensitiveTableAccess{ + entries[i] = eventLogEntry{ + targetID: int32(ev.desc.GetID()), + event: &eventpb.SensitiveTableAccess{ CommonSQLExecDetails: execDetails, TableName: tableName, AccessMode: mode, - }) + }, + } } + p.logEventsOnlyExternally(ctx, entries...) } + if slowQueryLogEnabled && ( // Did the user request pumping queries into the slow query log when // the logical plan has full scans? @@ -321,28 +320,44 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) } } - if logExecuteEnabled { - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.QueryExecute{CommonSQLExecDetails: execDetails}) + if logExecuteEnabled || logV { + // The API contract for logEventsWithOptions() is that it returns + // no error when system.eventlog is not written to. + _ = p.logEventsWithOptions(ctx, + 1, /* depth */ + eventLogOptions{ + // We pass LogToDevChannelIfVerbose because we have a log.V + // request for this file, which means the operator wants to + // see a copy of the execution on the DEV Channel. + dst: LogExternally | LogToDevChannelIfVerbose, + verboseTraceLevel: execType.vLevel(), + }, + eventLogEntry{event: &eventpb.QueryExecute{CommonSQLExecDetails: execDetails}}) } if shouldLogToAdminAuditLog { - p.logEventOnlyExternally(ctx, 0, /* log event not trigged by descriptor */ - &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) } } +func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { + // The API contract for logEventsWithOptions() is that it returns + // no error when system.eventlog is not written to. + _ = p.logEventsWithOptions(ctx, + 2, /* depth: we want to use the caller location */ + eventLogOptions{dst: LogExternally}, + entries...) +} + // maybeAudit marks the current plan being constructed as flagged // for auditing if the table being touched has an auditing mode set. // This is later picked up by maybeLogStatement() above. diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 1d9a7b1ec5d0..ad5ebfe90a15 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -177,12 +177,7 @@ func (n *changePrivilegesNode) startExec(params runParams) error { return err } - // The events to log at the end. - type eventEntry struct { - descID descpb.ID - event eventpb.EventPayload - } - var events []eventEntry + var events []eventLogEntry // First, update the descriptors. We want to catch all errors before // we update them in KV below. @@ -239,8 +234,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeDatabasePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeDatabasePrivilege{ CommonSQLPrivilegeEventDetails: privs, DatabaseName: (*tree.Name)(&d.Name).String(), }}) @@ -264,8 +260,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeTablePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeTablePrivilege{ CommonSQLPrivilegeEventDetails: privs, TableName: d.Name, // FIXME }}) @@ -278,8 +275,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeTypePrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeTypePrivilege{ CommonSQLPrivilegeEventDetails: privs, TypeName: d.Name, // FIXME }}) @@ -295,8 +293,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { for _, grantee := range n.grantees { privs := eventDetails // copy the granted/revoked privilege list. privs.Grantee = grantee.Normalized() - events = append(events, eventEntry{d.ID, - &eventpb.ChangeSchemaPrivilege{ + events = append(events, eventLogEntry{ + targetID: int32(d.ID), + event: &eventpb.ChangeSchemaPrivilege{ CommonSQLPrivilegeEventDetails: privs, SchemaName: d.Name, // FIXME }}) @@ -312,13 +311,7 @@ func (n *changePrivilegesNode) startExec(params runParams) error { // Record the privilege changes in the event log. This is an // auditable log event and is recorded in the same transaction as // the table descriptor update. - descIDs := make(descpb.IDs, 0, len(events)) - eventPayloads := make([]eventpb.EventPayload, 0, len(events)) - for _, ev := range events { - descIDs = append(descIDs, ev.descID) - eventPayloads = append(eventPayloads, ev.event) - } - if err := params.p.batchLogEvents(params.ctx, descIDs, eventPayloads...); err != nil { + if err := params.p.logEvents(params.ctx, events...); err != nil { return err } return nil