-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathevent_log.go
773 lines (711 loc) · 26 KB
/
event_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"encoding/binary"
"fmt"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
v1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1"
otel_logs_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scbuild"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)
// The logging functions in this file are the different stages of a
// pipeline that add more and more information to logging events until
// they are ready to be sent to either external sinks or to a system
// table.
//
// The overall structure of this pipeline is as follows:
//
// regular statement execution
// for "special" statements that
// have structured logging, e.g. CREATE, DROP etc
// |
// (produces pair(s) of descID (optional) and logpb.EventPayload)
// |
// v
// logEvent(descID, payload) / logEvents(eventEntries...)
// |
// | ,------- query logging in exec_log.go
// | / optionally via logEventOnlyExternally()
// | /
// v v
// logEventsWithOptions()
// |
// (extracts SQL exec details
// from execution context - see sqlEventCommonExecPayload)
// |
// | ,----------- async CREATE STATS
// | / goroutine
// | / on behalf of CREATE STATS stmt
// v v
// logEventInternalForSQLStatements()
// | (SQL exec details struct
// | and main event struct provided
// | separately as arguments)
// |
// (writes the exec details
// inside the event struct)
// |
// | ,----- job execution, at end
// | |
// | LogEventForJobs()
// | |
// | (add job ID,
// | + fields from job metadata
// | timestamp initialized at job txn read ts)
// | |
// | ,-----------------'
// | /
// | / ,------- async schema change
// | | | execution, at end
// | | v
// | | logEventInternalForSchemaChanges()
// | | |
// | | (add mutation ID,
// | | + fields from sc.change metadata
// | | timestamp initialized to txn read ts)
// | | |
// | | ,-------------'
// | | /
// | | /
// | | |
// | | | ,-------- node-level events outside of SQL
// | | | / (e.g. cluster membership)
// | | | /
// v v v v
// (expectation: per-type event structs
// fully populated at this point.
// Timestamp field must be set too.)
// |
// v
// InsertEventRecords() / insertEventRecords()
// |
// (finalize field EventType from struct type)
// |
// (route)
// |
// +--> system.eventlog if not disabled by setting
// | └ also the Obs Service, if connected
// |
// +--> DEV channel if requested by log.V
// |
// `--> external sinks (via logging package)
//
//
// logEvent emits a cluster event in the context of a regular SQL
// statement.
func (p *planner) logEvent(ctx context.Context, descID descpb.ID, event logpb.EventPayload) error {
if sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload); ok {
sqlCommon.CommonSQLDetails().DescriptorID = uint32(descID)
}
return p.logEventsWithOptions(ctx,
2, /* depth: use caller location */
eventLogOptions{dst: LogEverywhere},
event)
}
// logEvents is like logEvent, except that it can write multiple
// events simultaneously. This is advantageous for SQL statements
// that produce multiple events, e.g. GRANT, as they will
// processed using only one write batch (and thus lower latency).
func (p *planner) logEvents(ctx context.Context, entries ...logpb.EventPayload) error {
return p.logEventsWithOptions(ctx,
2, /* depth: use caller location */
eventLogOptions{dst: LogEverywhere},
entries...)
}
// eventLogOptions
type eventLogOptions struct {
// Where to emit the log event to.
dst LogEventDestination
// By default, a copy of each structured event is sent to the DEV
// channel (in addition to its default, nominal channel) if the
// vmodule filter is set to 2 or higher for the source file where
// the event call originates.
//
// If verboseTraceLevel is non-zero, its value is used as value for
// the vmodule filter. See exec_log for an example use.
verboseTraceLevel log.Level
// Additional redaction options, if necessary.
rOpts redactionOptions
// isCopy notes whether the current event is related to COPY.
isCopy bool
}
// redactionOptions contains instructions on how to redact the SQL
// events.
type redactionOptions struct {
omitSQLNameRedaction bool
}
func (ro *redactionOptions) toFlags() tree.FmtFlags {
if ro.omitSQLNameRedaction {
return tree.FmtOmitNameRedaction
}
return tree.FmtSimple
}
var defaultRedactionOptions = redactionOptions{
omitSQLNameRedaction: false,
}
func (p *planner) getCommonSQLEventDetails(opt redactionOptions) eventpb.CommonSQLEventDetails {
redactableStmt := formatStmtKeyAsRedactableString(
p.extendedEvalCtx.VirtualSchemas, p.stmt.AST,
p.extendedEvalCtx.Context.Annotations, opt.toFlags(), p,
)
commonSQLEventDetails := eventpb.CommonSQLEventDetails{
Statement: redactableStmt,
Tag: p.stmt.AST.StatementTag(),
User: p.SessionData().SessionUser().Normalized(),
ApplicationName: p.SessionData().ApplicationName,
}
if pls := p.extendedEvalCtx.Context.Placeholders.Values; len(pls) > 0 {
commonSQLEventDetails.PlaceholderValues = make([]string, len(pls))
for idx, val := range pls {
commonSQLEventDetails.PlaceholderValues[idx] = val.String()
}
}
return commonSQLEventDetails
}
// logEventsWithOptions is like logEvent() but it gives control to the
// caller as to where the event is written to.
//
// If opts.dst does not include LogToSystemTable, this function is
// guaranteed to not return an error.
func (p *planner) logEventsWithOptions(
ctx context.Context, depth int, opts eventLogOptions, entries ...logpb.EventPayload,
) error {
return logEventInternalForSQLStatements(ctx,
p.extendedEvalCtx.ExecCfg, p.InternalSQLTxn(),
1+depth,
opts,
p.getCommonSQLEventDetails(opts.rOpts),
entries...)
}
// logEventInternalForSchemaChange emits a cluster event in the
// context of a schema changer.
func logEventInternalForSchemaChanges(
ctx context.Context,
execCfg *ExecutorConfig,
txn isql.Txn,
sqlInstanceID base.SQLInstanceID,
descID descpb.ID,
mutationID descpb.MutationID,
event logpb.EventPayload,
) error {
event.CommonDetails().Timestamp = txn.KV().ReadTimestamp().WallTime
scCommon, ok := event.(eventpb.EventWithCommonSchemaChangePayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
}
m := scCommon.CommonSchemaChangeDetails()
m.InstanceID = int32(sqlInstanceID)
m.DescriptorID = uint32(descID)
m.MutationID = uint32(mutationID)
// Delegate the storing of the event to the regular event logic.
//
// We use depth=1 because the caller of this function typically
// wraps the call in a db.Txn() callback, which confuses the vmodule
// filtering. Easiest is to pretend the event is sourced here.
return insertEventRecords(
ctx, execCfg,
txn,
1, /* depth: use this function as origin */
eventLogOptions{dst: LogEverywhere},
event,
)
}
// logEventInternalForSQLStatements emits a cluster event on behalf of
// a SQL statement, when the point where the event is emitted does not
// have access to a (*planner) and the current statement metadata.
//
// In each event, if the DescriptorID field (in the
// CommonSQLEventDetails) is already populated, it is preserved.
//
// Note: usage of this interface should be minimized.
//
// If writeToEventLog is false, this function guarantees that it
// returns no error.
func logEventInternalForSQLStatements(
ctx context.Context,
execCfg *ExecutorConfig,
txn isql.Txn,
depth int,
opts eventLogOptions,
commonSQLEventDetails eventpb.CommonSQLEventDetails,
entries ...logpb.EventPayload,
) error {
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(event logpb.EventPayload) error {
if opts.isCopy {
// No txn is set for COPY, so use now instead.
event.CommonDetails().Timestamp = timeutil.Now().UnixNano()
} else {
event.CommonDetails().Timestamp = txn.KV().ReadTimestamp().WallTime
}
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
}
m := sqlCommon.CommonSQLDetails()
// We are going to inject the shared CommonSQLEventDetails into the event.
// However, before we do this, we must take notice that the caller
// may have populated the DescriptorID already. Here we have two cases:
// - either the caller has provided a value in *both*
// commonSQLEventDetails.DescriptorID,
// and also
// the event .DescriptorID field
// in which case, we will keep the former.
//
// - or, only the event .DescriptorID is set, and
// commonSQLEventDetails.DescriptorID is zero.
// In this case, we keep the event's value.
// First, save whatever value was there in the event first.
prevDescID := m.DescriptorID
// Overwrite with the common details.
*m = commonSQLEventDetails
// If the common details didn't have a descriptor ID, keep the
// one that was in the event already.
if m.DescriptorID == 0 {
m.DescriptorID = prevDescID
}
return nil
}
for i := range entries {
if err := injectCommonFields(entries[i]); err != nil {
return err
}
}
return insertEventRecords(
ctx,
execCfg,
txn,
1+depth, /* depth */
opts, /* eventLogOptions */
entries...,
)
}
type schemaChangerEventLogger struct {
txn isql.Txn
execCfg *ExecutorConfig
depth int
}
var _ scrun.EventLogger = (*schemaChangerEventLogger)(nil)
var _ scbuild.EventLogger = (*schemaChangerEventLogger)(nil)
// NewSchemaChangerBuildEventLogger returns a scbuild.EventLogger implementation.
func NewSchemaChangerBuildEventLogger(txn isql.Txn, execCfg *ExecutorConfig) scbuild.EventLogger {
return &schemaChangerEventLogger{
txn: txn,
execCfg: execCfg,
depth: 1,
}
}
// NewSchemaChangerRunEventLogger returns a scrun.EventLogger implementation.
func NewSchemaChangerRunEventLogger(txn isql.Txn, execCfg *ExecutorConfig) scrun.EventLogger {
return &schemaChangerEventLogger{
txn: txn,
execCfg: execCfg,
depth: 0,
}
}
// LogEvent implements the scbuild.EventLogger interface.
func (l *schemaChangerEventLogger) LogEvent(
ctx context.Context, details eventpb.CommonSQLEventDetails, event logpb.EventPayload,
) error {
return logEventInternalForSQLStatements(ctx,
l.execCfg,
l.txn,
l.depth,
eventLogOptions{dst: LogEverywhere},
details,
event)
}
// LogEventForSchemaChange implements the scrun.EventLogger interface.
func (l *schemaChangerEventLogger) LogEventForSchemaChange(
ctx context.Context, event logpb.EventPayload,
) error {
event.CommonDetails().Timestamp = l.txn.KV().ReadTimestamp().WallTime
scCommon, ok := event.(eventpb.EventWithCommonSchemaChangePayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
}
scCommon.CommonSchemaChangeDetails().InstanceID = int32(l.execCfg.NodeInfo.NodeID.SQLInstanceID())
return insertEventRecords(
ctx, l.execCfg,
l.txn,
1, /* depth: use this function as origin */
eventLogOptions{dst: LogEverywhere},
event,
)
}
// LogEventForJobs emits a cluster event in the context of a job.
func LogEventForJobs(
ctx context.Context,
execCfg *ExecutorConfig,
txn isql.Txn,
event logpb.EventPayload,
jobID int64,
payload jobspb.Payload,
user username.SQLUsername,
status jobs.Status,
) error {
event.CommonDetails().Timestamp = txn.KV().ReadTimestamp().WallTime
jobCommon, ok := event.(eventpb.EventWithCommonJobPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
}
m := jobCommon.CommonJobDetails()
m.JobID = jobID
m.JobType = payload.Type().String()
m.User = user.Normalized()
m.Status = string(status)
for _, id := range payload.DescriptorIDs {
m.DescriptorIDs = append(m.DescriptorIDs, uint32(id))
}
m.Description = payload.Description
// Delegate the storing of the event to the regular event logic.
//
// We use depth=1 because the caller of this function typically
// wraps the call in a db.Txn() callback, which confuses the vmodule
// filtering. Easiest is to pretend the event is sourced here.
return insertEventRecords(
ctx, execCfg,
txn,
1, /* depth: use this function for vmodule filtering */
eventLogOptions{dst: LogEverywhere},
event,
)
}
var eventLogSystemTableEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"server.eventlog.enabled",
"if set, logged notable events are also stored in the table system.eventlog",
true,
).WithPublic()
// EventLogTestingKnobs provides hooks and knobs for event logging.
type EventLogTestingKnobs struct {
// SyncWrites causes events to be written on the same txn as
// the SQL statement that causes them.
SyncWrites bool
}
// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
func (*EventLogTestingKnobs) ModuleTestingKnobs() {}
// LogEventDestination indicates for InsertEventRecords where the
// event should be directed to.
type LogEventDestination int
func (d LogEventDestination) hasFlag(f LogEventDestination) bool {
return d&f != 0
}
const (
// LogToSystemTable makes InsertEventRecords write one or more
// entries to the system eventlog table. (This behavior may be
// removed in a later version.)
LogToSystemTable LogEventDestination = 1 << iota
// LogExternally makes InsertEventRecords write the event(s) to the
// external logs.
LogExternally
// LogToDevChannelIfVerbose makes InsertEventRecords copy
// the structured event to the DEV logging channel
// if the vmodule filter for the log call is set high enough.
LogToDevChannelIfVerbose
// LogEverywhere logs to all the possible outputs.
LogEverywhere LogEventDestination = LogExternally | LogToSystemTable | LogToDevChannelIfVerbose
)
// InsertEventRecords inserts events into the event log as part
// of the provided transaction, using the provided internal executor.
//
// This converts to a call to insertEventRecords() with just 1 entry.
func InsertEventRecords(
ctx context.Context, execCfg *ExecutorConfig, dst LogEventDestination, info ...logpb.EventPayload,
) {
if len(info) == 0 {
return
}
// We use depth=1 because the caller of this function typically
// wraps the call in a db.Txn() callback, which confuses the vmodule
// filtering. Easiest is to pretend the event is sourced here.
err := insertEventRecords(ctx, execCfg, nil, /* txn */
1, /* depth: use this function */
eventLogOptions{dst: dst},
info...)
if err != nil {
// By spec, it should not return an error when passed a nil txn.
panic(errors.NewAssertionErrorWithWrappedErrf(err, "insertEventRecords returned unexpected error"))
}
}
// insertEventRecords inserts one or more event into the event log as
// part of the provided txn, using the provided internal executor.
//
// The caller is responsible for populating the timestamp field in the
// event payload and all the other per-payload specific fields. This
// function only takes care of populating the EventType field based on
// the run-time type of the event payload.
//
// If the txn field is non-nil and EventLogTestingKnobs.SyncWrites is
// set, the write is performed on the given txn object. We need this
// for tests.
//
// Otherwise, an asynchronous task is spawned to do the write:
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
func insertEventRecords(
ctx context.Context,
execCfg *ExecutorConfig,
txn isql.Txn,
depth int,
opts eventLogOptions,
entries ...logpb.EventPayload,
) error {
// Finish populating the entries.
for i := range entries {
// Ensure the type field is populated.
event := entries[i]
eventType := logpb.GetEventTypeName(event)
event.CommonDetails().EventType = eventType
// The caller is responsible for the timestamp field.
if event.CommonDetails().Timestamp == 0 {
return errors.AssertionFailedf("programming error: timestamp field in event %d not populated: %T", i, event)
}
}
if opts.dst.hasFlag(LogToDevChannelIfVerbose) {
// Emit a copy of the structured to the DEV channel when the
// vmodule setting matches.
level := log.Level(2)
if opts.verboseTraceLevel != 0 {
// Caller has overridden the level at which which log to the
// trace.
level = opts.verboseTraceLevel
}
if log.VDepth(level, depth) {
// The VDepth() call ensures that we are matching the vmodule
// setting to where the depth is equal to 1 in the caller stack.
for i := range entries {
log.InfofDepth(ctx, depth, "SQL event: payload %+v", entries[i])
}
}
}
// If we only want to log externally and not write to the events table, early exit.
loggingToSystemTable := opts.dst.hasFlag(LogToSystemTable) && eventLogSystemTableEnabled.Get(&execCfg.Settings.SV)
if !loggingToSystemTable {
// Simply emit the events to their respective channels and call it a day.
if opts.dst.hasFlag(LogExternally) {
for i := range entries {
log.StructuredEvent(ctx, entries[i])
}
}
// Not writing to system table: shortcut.
return nil
}
// When logging to the system table and there is a txn open already,
// ensure that the external logging only sees the event when the
// transaction commits.
if txn != nil && opts.dst.hasFlag(LogExternally) {
txn.KV().AddCommitTrigger(func(ctx context.Context) {
for i := range entries {
log.StructuredEvent(ctx, entries[i])
}
})
}
// Are we doing synchronous writes?
syncWrites := execCfg.EventLogTestingKnobs != nil && execCfg.EventLogTestingKnobs.SyncWrites
if txn != nil && syncWrites {
// Yes, do it now.
query, args, otelEvents := prepareEventWrite(ctx, execCfg, entries)
txn.KV().AddCommitTrigger(func(ctx context.Context) { sendEventsToObsService(ctx, execCfg, otelEvents) })
return writeToSystemEventsTable(ctx, txn, len(entries), query, args)
}
// No: do them async.
// With txn: trigger async write at end of txn (no event logged if txn aborts).
// Without txn: schedule it now.
if txn == nil {
asyncWriteToOtelAndSystemEventsTable(ctx, execCfg, entries)
} else {
txn.KV().AddCommitTrigger(func(ctx context.Context) {
asyncWriteToOtelAndSystemEventsTable(ctx, execCfg, entries)
})
}
return nil
}
func asyncWriteToOtelAndSystemEventsTable(
ctx context.Context, execCfg *ExecutorConfig, entries []logpb.EventPayload,
) {
// perAttemptTimeout is the maximum amount of time to wait on each
// eventlog write attempt.
const perAttemptTimeout time.Duration = 5 * time.Second
// maxAttempts is the maximum number of attempts to write an
// eventlog entry.
const maxAttempts = 10
stopper := execCfg.RPCContext.Stopper
origCtx := ctx
if err := stopper.RunAsyncTask(
// Note: we don't want to inherit the cancellation of the parent
// context. The SQL statement that causes the eventlog entry may
// terminate (and its context cancelled) before the eventlog entry
// gets written.
context.Background(), "record-events", func(ctx context.Context) {
ctx, span := execCfg.AmbientCtx.AnnotateCtxWithSpan(ctx, "record-events")
defer span.Finish()
// Copy the tags from the original query (which will contain
// things like username, current internal executor context, etc).
ctx = logtags.AddTags(ctx, logtags.FromContext(origCtx))
// Stop writing the event when the server shuts down.
ctx, stopCancel := stopper.WithCancelOnQuiesce(ctx)
defer stopCancel()
// Prepare the data to send.
query, args, otelEvents := prepareEventWrite(ctx, execCfg, entries)
// Send to the Obs Service.
sendEventsToObsService(ctx, execCfg, otelEvents)
// We use a retry loop in case there are transient
// non-retriable errors on the cluster during the table write.
// (retriable errors are already processed automatically
// by db.Txn)
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = ctx.Done()
retryOpts.MaxRetries = int(maxAttempts)
for r := retry.Start(retryOpts); r.Next(); {
// Don't try too long to write if the system table is unavailable.
if err := contextutil.RunWithTimeout(ctx, "record-events", perAttemptTimeout, func(ctx context.Context) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return writeToSystemEventsTable(ctx, txn, len(entries), query, args)
})
}); err != nil {
log.Ops.Warningf(ctx, "unable to save %d entries to system.eventlog: %v", len(entries), err)
} else {
break
}
}
}); err != nil {
expectedStopperError := errors.Is(err, stop.ErrThrottled) || errors.Is(err, stop.ErrUnavailable)
if !expectedStopperError {
// RunAsyncTask only returns an error not listed above
// if its context was canceled, and we're using the
// background context here.
err = errors.NewAssertionErrorWithWrappedErrf(err, "unexpected stopper error")
}
log.Warningf(ctx, "failed to start task to save %d events in eventlog: %v", len(entries), err)
}
}
func sendEventsToObsService(
ctx context.Context, execCfg *ExecutorConfig, events []otel_logs_pb.LogRecord,
) {
for i := range events {
execCfg.EventsExporter.SendEvent(ctx, obspb.EventlogEvent, events[i])
}
}
func prepareEventWrite(
ctx context.Context, execCfg *ExecutorConfig, entries []logpb.EventPayload,
) (query string, args []interface{}, events []otel_logs_pb.LogRecord) {
reportingID := execCfg.NodeInfo.NodeID.SQLInstanceID()
const colsPerEvent = 4
// Note: we insert the value zero as targetID because sadly this
// now-deprecated column has a NOT NULL constraint.
// TODO(knz): Add a migration to remove the column altogether.
const baseQuery = `
INSERT INTO system.eventlog (
timestamp, "eventType", "reportingID", info, "targetID"
)
VALUES($1, $2, $3, $4, 0)`
args = make([]interface{}, 0, len(entries)*colsPerEvent)
events = make([]otel_logs_pb.LogRecord, len(entries))
sp := tracing.SpanFromContext(ctx)
var traceID [16]byte
var spanID [8]byte
if sp != nil {
// Our trace IDs are 8 bytes, but OTLP insists on 16. We'll use only the
// most significant bytes and leave the rest zero.
//
// NOTE(andrei): The BigEndian is an arbitrary decision; I don't know how
// others serialize their UUIDs, but I went with the network byte order.
binary.BigEndian.PutUint64(traceID[:], uint64(sp.TraceID()))
binary.BigEndian.PutUint64(spanID[:], uint64(sp.SpanID()))
}
nowNanos := timeutil.Now().UnixNano()
for i := 0; i < len(entries); i++ {
event := entries[i]
infoBytes := redact.RedactableBytes("{")
_, infoBytes = event.AppendJSONFields(false /* printComma */, infoBytes)
infoBytes = append(infoBytes, '}')
// In the system.eventlog table, we do not use redaction markers.
// (compatibility with previous versions of CockroachDB.)
infoBytes = infoBytes.StripMarkers()
eventType := event.CommonDetails().EventType
args = append(
args,
timeutil.Unix(0, event.CommonDetails().Timestamp),
eventType,
reportingID,
string(infoBytes),
)
events[i] = otel_logs_pb.LogRecord{
TimeUnixNano: uint64(nowNanos),
Body: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: args[len(args)-1].(string)}},
Attributes: []*v1.KeyValue{{
Key: obspb.EventlogEventTypeAttribute,
Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: eventType}},
}},
TraceId: traceID[:],
SpanId: spanID[:],
}
}
// In the common case where we have just 1 event, we want to skeep
// the extra heap allocation and buffer operations of the loop
// below. This is an optimization.
query = baseQuery
if len(entries) > 1 {
// Extend the query with additional VALUES clauses for all the
// events after the first one.
var completeQuery strings.Builder
completeQuery.WriteString(baseQuery)
for i := range entries[1:] {
placeholderNum := 1 + colsPerEvent*(i+1)
fmt.Fprintf(&completeQuery, ", ($%d, $%d, $%d, $%d, 0)",
placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3)
}
query = completeQuery.String()
}
return query, args, events
}
func writeToSystemEventsTable(
ctx context.Context, txn isql.Txn, numEntries int, query string, args []interface{},
) error {
rows, err := txn.ExecEx(
ctx, "log-event", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
query, args...,
)
if err != nil {
return err
}
if rows != numEntries {
return errors.AssertionFailedf("%d rows affected by log insertion; expected %d rows affected", rows, numEntries)
}
return nil
}