Skip to content

Commit

Permalink
sql: batch write event logs for grant/revoke
Browse files Browse the repository at this point in the history
Helps with #41930.

Previously, if we ran grant/revoke on multiple tables,
we would create event logs for each table and write them
one by one, resulting in round trips proportional to
the number of tables.
This patch addresses this by batch writing the event logs,
so that 1 write to the event log table occurs regardless
of the number of tables updated.

Release note: None
  • Loading branch information
the-ericwang35 committed Apr 22, 2021
1 parent b8553b7 commit 5813172
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 55 deletions.
8 changes: 4 additions & 4 deletions pkg/bench/ddl_analysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ exp,benchmark
40,DropView/drop_2_views
57,DropView/drop_3_views
19,Grant/grant_all_on_1_table
23,Grant/grant_all_on_2_tables
27,Grant/grant_all_on_3_tables
22,Grant/grant_all_on_2_tables
25,Grant/grant_all_on_3_tables
19,GrantRole/grant_1_role
22,GrantRole/grant_2_roles
2,ORMQueries/activerecord_type_introspection_query
Expand All @@ -61,8 +61,8 @@ exp,benchmark
2,ORMQueries/pg_namespace
2,ORMQueries/pg_type
19,Revoke/revoke_all_on_1_table
23,Revoke/revoke_all_on_2_tables
27,Revoke/revoke_all_on_3_tables
22,Revoke/revoke_all_on_2_tables
25,Revoke/revoke_all_on_3_tables
18,RevokeRole/revoke_1_role
20,RevokeRole/revoke_2_roles
1,SystemDatabaseQueries/select_system.users_with_empty_database_name
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,15 +582,15 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
// See: https://github.com/cockroachdb/cockroach/issues/57739
return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return logEventInternalForSQLStatements(ctx, evalCtx.ExecCfg, txn,
details.Table.ID,
descpb.IDs{details.Table.ID},
evalCtx.SessionData.User(),
evalCtx.SessionData.ApplicationName,
details.Statement,
nil, /* no placeholders known at this point */
nil, /* no placeholders known at this point */
true, /* writeToEventLog */
&eventpb.CreateStatistics{
TableName: details.FQTableName,
},
true, /* writeToEventLog */
)
})
}
Expand Down
239 changes: 194 additions & 45 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package sql
import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -31,35 +33,41 @@ import (
// logEvent emits a cluster event in the context of a regular SQL
// statement.
func (p *planner) logEvent(
ctx context.Context, descID descpb.ID, event eventpb.EventPayload,
ctx context.Context, descID descpb.ID, events eventpb.EventPayload,
) error {
return p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, true /* writeToEventLog */, events)
}

// batchLogEvents is like logEvent, except it takes in slice of events
// to batch write.
func (p *planner) batchLogEvents(
ctx context.Context, descIDs descpb.IDs, events ...eventpb.EventPayload,
) error {
return p.logEventWithSystemEventLogOption(ctx, descID, event, true /* writeToEventLog */)
return p.logEventsWithSystemEventLogOption(ctx, descIDs, true /* writeToEventLog */, events...)
}

func (p *planner) logEventOnlyExternally(
ctx context.Context, descID descpb.ID, event eventpb.EventPayload,
) {
// The API contract for logEventWithSystemEventLogOption() is that it returns
// no error when system.eventlog is not written to.
_ = p.logEventWithSystemEventLogOption(ctx, descID, event, false /* writeToEventLog */)
_ = p.logEventsWithSystemEventLogOption(ctx, descpb.IDs{descID}, false /* writeToEventLog */, event)
}

// logEventWithSystemEventLogOption is like logEvent() but it gives
// logEventsWithSystemEventLogOption is like logEvent() but it gives
// control to the caller as to whether the entry is written into
// system.eventlog.
//
// If writeToEventLog is false, this function guarantees that it
// returns no error.
func (p *planner) logEventWithSystemEventLogOption(
ctx context.Context, descID descpb.ID, event eventpb.EventPayload, writeToEventLog bool,
func (p *planner) logEventsWithSystemEventLogOption(
ctx context.Context, descIDs descpb.IDs, writeToEventLog bool, events ...eventpb.EventPayload,
) error {
// Compute the common fields from data already known to the planner.
user := p.User()
stmt := tree.AsStringWithFQNames(p.stmt.AST, p.extendedEvalCtx.EvalContext.Annotations)
pl := p.extendedEvalCtx.EvalContext.Placeholders.Values
appName := p.SessionData().ApplicationName

return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, descID, user, appName, stmt, pl, event, writeToEventLog)
return logEventInternalForSQLStatements(ctx, p.extendedEvalCtx.ExecCfg, p.txn, descIDs, user, appName, stmt, pl, writeToEventLog, events...)
}

// logEventInternalForSchemaChange emits a cluster event in the
Expand Down Expand Up @@ -107,15 +115,46 @@ func logEventInternalForSQLStatements(
ctx context.Context,
execCfg *ExecutorConfig,
txn *kv.Txn,
descID descpb.ID,
descIDs descpb.IDs,
user security.SQLUsername,
appName string,
stmt string,
placeholders tree.QueryArguments,
event eventpb.EventPayload,
writeToEventLog bool,
events ...eventpb.EventPayload,
) error {
// Inject the common fields into the payload provided by the caller.
for i := range events {
if err := injectCommonFields(
txn, descIDs[i], user, appName, stmt, placeholders, events[i],
); err != nil {
return err
}
}

// Delegate the storing of the event to the regular event logic.
if !writeToEventLog || !eventLogEnabled.Get(&execCfg.InternalExecutor.s.cfg.Settings.SV) {
return skipWritePath(ctx, txn, events, !writeToEventLog)
}

return batchInsertEventRecords(ctx, execCfg.InternalExecutor,
txn,
descIDs,
int32(execCfg.NodeID.SQLInstanceID()),
events...,
)
}

// injectCommonFields injects the common fields into the event payload provided by the caller.
func injectCommonFields(
txn *kv.Txn,
descID descpb.ID,
user security.SQLUsername,
appName string,
stmt string,
placeholders tree.QueryArguments,
event eventpb.EventPayload,
) error {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
Expand All @@ -132,16 +171,7 @@ func logEventInternalForSQLStatements(
m.PlaceholderValues[idx] = val.String()
}
}

// Delegate the storing of the event to the regular event logic.
return InsertEventRecord(ctx, execCfg.InternalExecutor,
txn,
int32(descID),
int32(execCfg.NodeID.SQLInstanceID()),
false, /* skipExternalLog */
event,
!writeToEventLog,
)
return nil
}

// LogEventForJobs emits a cluster event in the context of a job.
Expand Down Expand Up @@ -211,6 +241,87 @@ func InsertEventRecord(
skipExternalLog bool,
info eventpb.EventPayload,
onlyLog bool,
) error {
if onlyLog || !eventLogEnabled.Get(&ex.s.cfg.Settings.SV) {
return skipWritePath(ctx, txn, []eventpb.EventPayload{info}, onlyLog)
}
return batchInsertEventRecords(
ctx, ex, txn,
descpb.IDs{descpb.ID(targetID)},
reportingID,
info,
)
}

// batchInsertEventRecords is like InsertEventRecord except it takes
// a slice of events to batch write. Any insert that calls this function
// will always write to the event table (i.e. it won't only log them, and writing
// to the event table will not be disabled).
func batchInsertEventRecords(
ctx context.Context,
ex *InternalExecutor,
txn *kv.Txn,
descIDs descpb.IDs,
reportingID int32,
events ...eventpb.EventPayload,
) error {
const colsPerEvent = 5
const baseQuery = `
INSERT INTO system.eventlog (
timestamp, "eventType", "targetID", "reportingID", info
)
VALUES($1, $2, $3, $4, $5)`
args := make([]interface{}, 0, len(events)*colsPerEvent)

// Prepare first row so we can take the fast path if we're only inserting one event log.
if err := prepareRow(
ctx, txn, &args, events[0], descIDs[0], reportingID,
); err != nil {
return err
}
if len(events) == 1 {
return execEventLogInsert(ctx, ex, txn, baseQuery, args, len(events))
}

var additionalRows strings.Builder
for i := 1; i < len(events); i++ {
var placeholderNum = 1 + (i * colsPerEvent)
if err := prepareRow(ctx, txn, &args, events[i], descIDs[i], reportingID); err != nil {
return err
}
additionalRows.WriteString(fmt.Sprintf(", ($%d, $%d, $%d, $%d, $%d)",
placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4))
}

rows, err := ex.Exec(ctx, "log-event", txn, baseQuery+additionalRows.String(), args...)
if err != nil {
return err
}
if rows != len(events) {
return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(events))
}
return nil
}

// skipWritePath is used when either onlyLog is true, or writes to the event log
// table are disabled. In these cases, we do not write to the event log table.
func skipWritePath(
ctx context.Context, txn *kv.Txn, events []eventpb.EventPayload, onlyLog bool,
) error {
for i := range events {
if err := setupEventAndMaybeLog(
ctx, txn, events[i], onlyLog,
); err != nil {
return err
}
}
return nil
}

// setupEventAndMaybeLog prepares the event log to be written. Also,
// if onlyLog is true, it will log the event.
func setupEventAndMaybeLog(
ctx context.Context, txn *kv.Txn, info eventpb.EventPayload, onlyLog bool,
) error {
eventType := eventpb.GetEventTypeName(info)

Expand All @@ -222,6 +333,7 @@ func InsertEventRecord(
return errors.AssertionFailedf("programming error: timestamp field in event not populated: %T", info)
}

// If we only want to log and not write to the events table, early exit.
if onlyLog {
log.StructuredEvent(ctx, info)
return nil
Expand All @@ -233,39 +345,76 @@ func InsertEventRecord(
log.StructuredEvent(ctx, info)
})

// If writes to the event log table are disabled, take a shortcut.
if !eventLogEnabled.Get(&ex.s.cfg.Settings.SV) {
return nil
}
return nil
}

const insertEventTableStmt = `
INSERT INTO system.eventlog (
timestamp, "eventType", "targetID", "reportingID", info
)
VALUES(
$1, $2, $3, $4, $5
)
`
args := []interface{}{
timeutil.Unix(0, info.CommonDetails().Timestamp),
eventType,
targetID,
// constructArgs constructs the values for a single event-log row insert.
func constructArgs(
args *[]interface{},
event eventpb.EventPayload,
eventType string,
descID descpb.ID,
reportingID int32,
) error {
*args = append(
*args,
timeutil.Unix(0, event.CommonDetails().Timestamp),
eventType, int32(descID),
reportingID,
nil, // info
}
if info != nil {
infoBytes, err := json.Marshal(info)
)
var info interface{}
if event != nil {
infoBytes, err := json.Marshal(event)
if err != nil {
return err
}
args[4] = string(infoBytes)
info = string(infoBytes)
}
rows, err := ex.Exec(ctx, "log-event", txn, insertEventTableStmt, args...)
*args = append(*args, info)
return nil
}

// execEventLogInsert executes the insert query to insert the new events
// into the event log table.
func execEventLogInsert(
ctx context.Context,
ex *InternalExecutor,
txn *kv.Txn,
query string,
args []interface{},
numEvents int,
) error {
rows, err := ex.Exec(ctx, "log-event", txn, query, args...)
if err != nil {
return err
}
if rows != 1 {
return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows)
if rows != numEvents {
return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, numEvents)
}
return nil
}

// prepareRow creates the values of an insert for a row. It populates the
// event payload with additional info, and then adds the values of the row to args.
func prepareRow(
ctx context.Context,
txn *kv.Txn,
args *[]interface{},
event eventpb.EventPayload,
descID descpb.ID,
reportingID int32,
) error {
// Setup event log.
eventType := eventpb.GetEventTypeName(event)
if err := setupEventAndMaybeLog(
ctx, txn, event, false, /* onlyLog */
); err != nil {
return err
}

// Construct the args for this row.
if err := constructArgs(args, event, eventType, descID, reportingID); err != nil {
return err
}
return nil
}
10 changes: 7 additions & 3 deletions pkg/sql/grant_revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,14 @@ func (n *changePrivilegesNode) startExec(params runParams) error {
// Record the privilege changes in the event log. This is an
// auditable log event and is recorded in the same transaction as
// the table descriptor update.
descIDs := make(descpb.IDs, 0, len(events))
eventPayloads := make([]eventpb.EventPayload, 0, len(events))
for _, ev := range events {
if err := params.p.logEvent(params.ctx, ev.descID, ev.event); err != nil {
return err
}
descIDs = append(descIDs, ev.descID)
eventPayloads = append(eventPayloads, ev.event)
}
if err := params.p.batchLogEvents(params.ctx, descIDs, eventPayloads...); err != nil {
return err
}
return nil
}
Expand Down
Loading

0 comments on commit 5813172

Please sign in to comment.