diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index d71fce0faed9..e3ce20de4aa4 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -22,8 +22,8 @@ exp,benchmark 15,AlterTableDropConstraint/alter_table_drop_2_check_constraints 15,AlterTableDropConstraint/alter_table_drop_3_check_constraints 9,AlterTableSplit/alter_table_split_at_1_value -13,AlterTableSplit/alter_table_split_at_2_values -17,AlterTableSplit/alter_table_split_at_3_values +11,AlterTableSplit/alter_table_split_at_2_values +14,AlterTableSplit/alter_table_split_at_3_values 7,AlterTableUnsplit/alter_table_unsplit_at_1_value 9,AlterTableUnsplit/alter_table_unsplit_at_2_values 11,AlterTableUnsplit/alter_table_unsplit_at_3_values diff --git a/pkg/kv/db.go b/pkg/kv/db.go index ea5126dab707..320a38f17b54 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -292,6 +292,11 @@ func (db *DB) Context() DBContext { return db.ctx } +// NewBatch creates a new empty batch. +func (db *DB) NewBatch() *Batch { + return &Batch{} +} + // NewDB returns a new DB. func NewDB( actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper, diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 2ac26f013cc3..30aa78227553 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -123,8 +123,20 @@ func (s *Store) LogReplicaChangeTest( desc roachpb.RangeDescriptor, reason kvserverpb.RangeLogEventReason, details string, + logAsync bool, ) error { - return s.logChange(ctx, txn, changeType, replica, desc, reason, details) + return s.logChange(ctx, txn, changeType, replica, desc, reason, details, logAsync) +} + +// LogSplitTest adds a fake split event to the rangelog. +func (s *Store) LogSplitTest( + ctx context.Context, + txn *kv.Txn, + updatedDesc, newDesc roachpb.RangeDescriptor, + reason string, + logAsync bool, +) error { + return s.logSplit(ctx, txn, updatedDesc, newDesc, reason, logAsync) } // ReplicateQueuePurgatoryLength returns the number of replicas in replicate diff --git a/pkg/kv/kvserver/range_log.go b/pkg/kv/kvserver/range_log.go index 21523709d8b8..f8c355c9cfe0 100644 --- a/pkg/kv/kvserver/range_log.go +++ b/pkg/kv/kvserver/range_log.go @@ -14,19 +14,30 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" ) +// DBOrTxn is used to provide flexibility for logging rangelog events either +// transactionally (using a Txn), or non-transactionally (using a DB). +type DBOrTxn interface { + Run(ctx context.Context, b *kv.Batch) error + NewBatch() *kv.Batch +} + // RangeLogWriter is used to write range log events to the rangelog // table. type RangeLogWriter interface { - WriteRangeLogEvent(context.Context, *kv.Txn, kvserverpb.RangeLogEvent) error + WriteRangeLogEvent(context.Context, DBOrTxn, kvserverpb.RangeLogEvent) error } // wrappedRangeLogWriter implements RangeLogWriter, performing logging and @@ -55,14 +66,14 @@ func newWrappedRangeLogWriter( var _ RangeLogWriter = (*wrappedRangeLogWriter)(nil) func (w *wrappedRangeLogWriter) WriteRangeLogEvent( - ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, + ctx context.Context, runner DBOrTxn, event kvserverpb.RangeLogEvent, ) error { maybeLogRangeLogEvent(ctx, event) if c := w.getCounter(event.EventType); c != nil { c.Inc(1) } if w.shouldWrite() && w.underlying != nil { - return w.underlying.WriteRangeLogEvent(ctx, txn, event) + return w.underlying.WriteRangeLogEvent(ctx, runner, event) } return nil } @@ -84,9 +95,13 @@ func maybeLogRangeLogEvent(ctx context.Context, event kvserverpb.RangeLogEvent) // the range which previously existed and is being split in half; the "other" // range is the new range which is being created. func (s *Store) logSplit( - ctx context.Context, txn *kv.Txn, updatedDesc, newDesc roachpb.RangeDescriptor, reason string, + ctx context.Context, + txn *kv.Txn, + updatedDesc, newDesc roachpb.RangeDescriptor, + reason string, + logAsync bool, ) error { - return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{ + logEvent := kvserverpb.RangeLogEvent{ Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()), RangeID: updatedDesc.RangeID, EventType: kvserverpb.RangeLogEventType_split, @@ -97,7 +112,9 @@ func (s *Store) logSplit( NewDesc: &newDesc, Details: reason, }, - }) + } + + return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync) } // logMerge logs a range split event into the event table. The affected range is @@ -106,9 +123,9 @@ func (s *Store) logSplit( // TODO(benesch): There are several different reasons that a range merge // could occur, and that information should be logged. func (s *Store) logMerge( - ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor, + ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor, logAsync bool, ) error { - return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{ + logEvent := kvserverpb.RangeLogEvent{ Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()), RangeID: updatedLHSDesc.RangeID, EventType: kvserverpb.RangeLogEventType_merge, @@ -118,7 +135,9 @@ func (s *Store) logMerge( UpdatedDesc: &updatedLHSDesc, RemovedDesc: &rhsDesc, }, - }) + } + + return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync) } // logChange logs a replica change event, which represents a replica being added @@ -133,6 +152,7 @@ func (s *Store) logChange( desc roachpb.RangeDescriptor, reason kvserverpb.RangeLogEventReason, details string, + logAsync bool, ) error { var logType kvserverpb.RangeLogEventType var info kvserverpb.RangeLogEvent_Info @@ -173,13 +193,15 @@ func (s *Store) logChange( return errors.Errorf("unknown replica change type %s", changeType) } - return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{ + logEvent := kvserverpb.RangeLogEvent{ Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()), RangeID: desc.RangeID, EventType: logType, StoreID: s.StoreID(), Info: &info, - }) + } + + return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync) } // selectEventTimestamp selects a timestamp for this log message. If the @@ -196,3 +218,49 @@ func selectEventTimestamp(s *Store, input hlc.Timestamp) time.Time { } return input.GoTime() } + +// writeToRangeLogTable writes a range-change log event to system.rangelog by +// invoking RangeLogWriter.WriteRangeLogEvent. If logAsync is false, the logging +// is done directly as part of the given transaction. If logAsync is true, the +// logging is done in an async task (with retries and timeouts), and that task is +// added as a commit trigger to the given transaction. +func writeToRangeLogTable( + ctx context.Context, s *Store, txn *kv.Txn, logEvent kvserverpb.RangeLogEvent, logAsync bool, +) error { + if !logAsync { + return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, logEvent) + } + + asyncLogFn := func(ctx context.Context) { + stopper := txn.DB().Context().Stopper + // Copy the tags from the original context + asyncCtx := logtags.AddTags(context.Background(), logtags.FromContext(ctx)) + // Stop writing when the server shuts down. + asyncCtx, stopCancel := stopper.WithCancelOnQuiesce(asyncCtx) + + const perAttemptTimeout = 5 * time.Second + const maxAttempts = 5 + retryOpts := base.DefaultRetryOptions() + retryOpts.Closer = asyncCtx.Done() + retryOpts.MaxRetries = maxAttempts + + if err := stopper.RunAsyncTask( + asyncCtx, "rangelog-async", func(ctx context.Context) { + defer stopCancel() + for r := retry.Start(retryOpts); r.Next(); { + if err := timeutil.RunWithTimeout(ctx, "rangelog-timeout", perAttemptTimeout, func(ctx context.Context) error { + return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn.DB(), logEvent) + }); err != nil { + log.Warningf(ctx, "error logging to system.rangelog: %v", err) + continue + } + break + } + }); err != nil { + log.Warningf(asyncCtx, "async task error while logging to system.rangelog: %v", err) + stopCancel() + } + } + txn.AddCommitTrigger(asyncLogFn) + return nil +} diff --git a/pkg/kv/kvserver/range_log_test.go b/pkg/kv/kvserver/range_log_test.go index 0cf75171ccb5..774e79de65bc 100644 --- a/pkg/kv/kvserver/range_log_test.go +++ b/pkg/kv/kvserver/range_log_test.go @@ -15,6 +15,7 @@ import ( gosql "database/sql" "encoding/json" "net/url" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -25,14 +26,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" _ "github.com/lib/pq" + "github.com/stretchr/testify/require" ) +func countEvents( + ctx context.Context, db *gosql.DB, eventType kvserverpb.RangeLogEventType, +) (int, error) { + var count int + err := db.QueryRowContext(ctx, + `SELECT count(*) FROM system.rangelog WHERE "eventType" = $1`, + eventType.String()).Scan(&count) + return count, err +} + func TestLogSplits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -41,19 +55,9 @@ func TestLogSplits(t *testing.T) { ctx := context.Background() defer s.Stopper().Stop(ctx) - countSplits := func() int { - var count int - err := db.QueryRowContext(ctx, - `SELECT count(*) FROM system.rangelog WHERE "eventType" = $1`, - kvserverpb.RangeLogEventType_split.String()).Scan(&count) - if err != nil { - t.Fatal(err) - } - return count - } - // Count the number of split events. - initialSplits := countSplits() + initialSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) // Generate an explicit split event. if err := kvDB.AdminSplit( @@ -64,12 +68,18 @@ func TestLogSplits(t *testing.T) { t.Fatal(err) } - // Verify that the count has increased by at least one. Realistically it's - // almost always by exactly one, but if there are any other splits they - // might race in after the previous call to countSplits(). - if now := countSplits(); now <= initialSplits { - t.Fatalf("expected >= %d splits, found %d", initialSplits, now) - } + // Logging is done in an async task, so it may need some extra time to finish. + testutils.SucceedsSoon(t, func() error { + currentSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) + // Verify that the count has increased by at least one. Realistically it's + // almost always by exactly one, but if there are any other splits they + // might race in after the previous call to countSplits(). + if currentSplits <= initialSplits { + return errors.Newf("expected > %d splits, found %d", initialSplits, currentSplits) + } + return nil + }) // verify that RangeID always increases (a good way to see that the splits // are logged correctly) @@ -162,20 +172,11 @@ func TestLogMerges(t *testing.T) { t.Fatal(pErr) } - countRangeLogMerges := func() int { - var count int - err := db.QueryRowContext(ctx, - `SELECT count(*) FROM system.rangelog WHERE "eventType" = $1`, - kvserverpb.RangeLogEventType_merge.String()).Scan(&count) - if err != nil { - t.Fatal(err) - } - return count - } - // No ranges should have merged immediately after startup. - if n := countRangeLogMerges(); n != 0 { - t.Fatalf("expected 0 initial merges, but got %d", n) + initialMerges, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_merge) + require.NoError(t, err) + if initialMerges != 0 { + t.Fatalf("expected 0 initial merges, but got %d", initialMerges) } if n := store.Metrics().RangeMerges.Count(); n != 0 { t.Errorf("expected 0 initial merges, but got %d", n) @@ -200,12 +201,18 @@ func TestLogMerges(t *testing.T) { t.Fatal(err) } - if n := countRangeLogMerges(); n != 1 { - t.Fatalf("expected 1 merge, but got %d", n) - } - if n := store.Metrics().RangeMerges.Count(); n != 1 { - t.Errorf("expected 1 merge, but got %d", n) - } + // Logging is done in an async task, so it may need some extra time to finish. + testutils.SucceedsSoon(t, func() error { + currentMerges, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_merge) + require.NoError(t, err) + if currentMerges != 1 { + return errors.Newf("expected 1 merge, but got %d", currentMerges) + } + if n := store.Metrics().RangeMerges.Count(); n != 1 { + return errors.Newf("expected 1 merge, but got %d", n) + } + return nil + }) rows, err := db.QueryContext(ctx, `SELECT "rangeID", "otherRangeID", info FROM system.rangelog WHERE "eventType" = $1`, @@ -275,7 +282,11 @@ func TestLogRebalances(t *testing.T) { const details = "test" logEvent := func(changeType roachpb.ReplicaChangeType, reason kvserverpb.RangeLogEventReason) { if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return store.LogReplicaChangeTest(ctx, txn, changeType, desc.InternalReplicas[0], *desc, reason, details) + // Not logging async here because logging is the only part of this + // transaction. If we wanted to log async we would need to add another + // fake operation to the transaction, so it can commit and invoke the + // commit trigger, which does the logging. + return store.LogReplicaChangeTest(ctx, txn, changeType, desc.InternalReplicas[0], *desc, reason, details, false) }); err != nil { t.Fatal(err) } @@ -409,3 +420,114 @@ func TestLogRebalances(t *testing.T) { t.Errorf("expected %d RemoveReplica events logged, found %d", e, a) } } + +// TestAsyncLogging tests the logAsync flag and the writeToRangeLogTable by +// attempting to log a range split with a very long reason string. The multi-MB +// string forces the logging to fail. In the case where logAsync is true, the +// logging is best-effort and the rest of the transaction commits. In the case +// where logAsync is false, the failed logging forces the transaction to retry. +func TestAsyncLogging(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + ts := s.(*server.TestServer) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) + require.NoError(t, err) + + // Log a fake split event inside a transaction that also writes to key a. + logEvent := func(reason string, logAsync bool) error { + return kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + e := txn.Put(ctx, "a", "b") + require.NoError(t, e) + desc := roachpb.RangeDescriptor{} + return store.LogSplitTest(ctx, txn, desc, desc, reason, logAsync) + }) + } + + // A string used as the split reason; 100MB long. + longReason := strings.Repeat("r", 100*1024*1024) + initialSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) + + t.Run("failed-async-log", func(t *testing.T) { + // Logging is done in an async task, so it may need some extra time to finish. + testutils.SucceedsSoon(t, func() error { + // Start with a key-value pair a-a. + err = kvDB.Put(ctx, "a", "a") + require.NoError(t, err) + // Pass a very long reason string to the event logger to ensure logging + // fails. + err = logEvent(longReason, true) + // Logging errors are not returned here because logging runs as an async + // task. + require.NoError(t, err) + currentSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) + // The current splits should be the same as the initial splits because + // writing to the rangelog table failed. + require.Equal(t, initialSplits, currentSplits) + v, err := kvDB.Get(ctx, "a") + require.NoError(t, err) + val, _ := v.Value.GetBytes() + // The rest of the transaction is expected to have succeeded and updated the + // value for key a from a to b. + require.Equal(t, "b", string(val)) + return nil + }) + }) + + t.Run("failed-sync-log", func(t *testing.T) { + // Logging is done in an async task, so it may need some extra time to finish. + testutils.SucceedsSoon(t, func() error { + // Start with a key-value pair a-a. + err = kvDB.Put(ctx, "a", "a") + require.NoError(t, err) + // Pass a very long reason string to the event logger to ensure logging + // fails. + err = logEvent(longReason, false) + // Logging is not async, so we expect to see an error here. + require.Regexp(t, "command is too large: .*", err) + currentSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) + // The current splits should be the same as the initial splits because + // writing to the rangelog table failed. + require.Equal(t, initialSplits, currentSplits) + v, err := kvDB.Get(ctx, "a") + require.NoError(t, err) + val, _ := v.Value.GetBytes() + // The entire transaction failed, so we expect to see the same value, a, for + // key a. + require.Equal(t, "a", string(val)) + return nil + }) + }) + + t.Run("successful-log", func(t *testing.T) { + // Logging is done in an async task, so it may need some extra time to finish. + testutils.SucceedsSoon(t, func() error { + testutils.RunTrueAndFalse(t, "log-async", func(t *testing.T, logAsync bool) { + // Start with a key-value pair a-a. + err = kvDB.Put(ctx, "a", "a") + require.NoError(t, err) + // Log a reasonable size event, no error expected. + err = logEvent("reason", logAsync) + require.NoError(t, err) + currentSplits, err := countEvents(ctx, db, kvserverpb.RangeLogEventType_split) + require.NoError(t, err) + // Writing to rangelog succeeded, so we expect to see more splits than + // initially. + require.Greater(t, currentSplits, initialSplits) + v, err := kvDB.Get(ctx, "a") + require.NoError(t, err) + val, _ := v.Value.GetBytes() + // The rest of the transaction is expected to have succeeded and updated + // the value for key a from a to b. + require.Equal(t, "b", string(val)) + }) + return nil + }) + }) +} diff --git a/pkg/kv/kvserver/rangelog/BUILD.bazel b/pkg/kv/kvserver/rangelog/BUILD.bazel index 7c9aac44c758..0b182893c017 100644 --- a/pkg/kv/kvserver/rangelog/BUILD.bazel +++ b/pkg/kv/kvserver/rangelog/BUILD.bazel @@ -8,7 +8,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", - "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverpb", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", diff --git a/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go b/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go index 027ce1d2e8eb..7e805e5a5354 100644 --- a/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go +++ b/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -50,7 +51,7 @@ func NewInternalExecutorWriter( } func (s *InternalExecutorWriter) WriteRangeLogEvent( - ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, + ctx context.Context, runner kvserver.DBOrTxn, event kvserverpb.RangeLogEvent, ) error { args := []interface{}{ event.Timestamp, @@ -72,6 +73,10 @@ func (s *InternalExecutorWriter) WriteRangeLogEvent( args[5] = string(infoBytes) } + txn, ok := runner.(*kv.Txn) + if !ok { + return errors.Errorf("no transaction provided") + } rows, err := s.ie.ExecEx(ctx, "log-range-event", txn, sessiondata.RootUserSessionDataOverride, s.insertQuery, args...) diff --git a/pkg/kv/kvserver/rangelog/rangelog.go b/pkg/kv/kvserver/rangelog/rangelog.go index 7320cf59b4ce..79fd33e35423 100644 --- a/pkg/kv/kvserver/rangelog/rangelog.go +++ b/pkg/kv/kvserver/rangelog/rangelog.go @@ -17,7 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" @@ -52,7 +52,7 @@ func newWriter(codec keys.SQLCodec, id IDGen, table catalog.TableDescriptor) *Wr // WriteRangeLogEvent implements kvserver.RangeLogWriter. It writes the event // to the system.rangelog table in the provided transaction. func (s *Writer) WriteRangeLogEvent( - ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, + ctx context.Context, runner kvserver.DBOrTxn, event kvserverpb.RangeLogEvent, ) error { ts, err := tree.MakeDTimestampTZ(event.Timestamp, time.Microsecond) if err != nil { @@ -80,11 +80,20 @@ func (s *Writer) WriteRangeLogEvent( } args[5] = tree.NewDString(string(infoBytes)) } - ba := txn.NewBatch() + ba := runner.NewBatch() if err := s.w.Insert(ctx, ba, false /* kvTrace */, args[:]...); err != nil { return errors.NewAssertionErrorWithWrappedErrf( err, "failed to encode rangelog index entries", ) } - return txn.Run(ctx, ba) + // TODO(mira #104075): Logging to system.rangelog is async by default, which + // means a range change might succeed but the logging might fail. We should + // log a *structured* event to KV_DISTRIBUTION here as an auxiliary logging + // mechanism. Currently, we can't use kvserver.RangeLogEvent directly as a + // structured event because that will introduce an unnecessary dependency from + // eventpb to kvserver and roachpb. We should either refactor the + // RangeLogEvent struct, or create a new struct for structured logging. + return runner.Run(ctx, ba) } + +var _ kvserver.RangeLogWriter = &Writer{} diff --git a/pkg/kv/kvserver/rangelog/rangelog_test.go b/pkg/kv/kvserver/rangelog/rangelog_test.go index 37db3b6b470e..6e23cef71b9d 100644 --- a/pkg/kv/kvserver/rangelog/rangelog_test.go +++ b/pkg/kv/kvserver/rangelog/rangelog_test.go @@ -245,10 +245,10 @@ type teeWriter struct { } func (t teeWriter) WriteRangeLogEvent( - ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, + ctx context.Context, runner kvserver.DBOrTxn, event kvserverpb.RangeLogEvent, ) error { - if err := t.a.WriteRangeLogEvent(ctx, txn, event); err != nil { + if err := t.a.WriteRangeLogEvent(ctx, runner, event); err != nil { return err } - return t.b.WriteRangeLogEvent(ctx, txn, event) + return t.b.WriteRangeLogEvent(ctx, runner, event) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 293f6693d082..198c8fc0249b 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -210,7 +210,7 @@ func splitTxnAttempt( } // Log the split into the range event log. - if err := store.logSplit(ctx, txn, *leftDesc, *rightDesc, reason); err != nil { + if err := store.logSplit(ctx, txn, *leftDesc, *rightDesc, reason, true /* logAsync */); err != nil { return err } @@ -715,11 +715,7 @@ func (r *Replica) AdminMerge( log.Infof(ctx, "initiating a merge of %s into this range (%s)", &rightDesc, reason) // Log the merge into the range event log. - // TODO(spencer): event logging API should accept a batch - // instead of a transaction; there's no reason this logging - // shouldn't be done in parallel via the batch with the updated - // range addressing. - if err := r.store.logMerge(ctx, txn, updatedLeftDesc, rightDesc); err != nil { + if err := r.store.logMerge(ctx, txn, updatedLeftDesc, rightDesc, true /* logAsync */); err != nil { return err } @@ -2418,13 +2414,13 @@ func execChangeReplicasTxn( // Log replica change into range event log. err = recordRangeEventsInLog( - ctx, txn, true /* added */, crt.Added(), crt.Desc, reason, details, args.logChange, + ctx, txn, true /* added */, crt.Added(), crt.Desc, reason, details, true /* logAsync */, args.logChange, ) if err != nil { return err } err = recordRangeEventsInLog( - ctx, txn, false /* added */, crt.Removed(), crt.Desc, reason, details, args.logChange, + ctx, txn, false /* added */, crt.Removed(), crt.Desc, reason, details, true /* logAsync */, args.logChange, ) if err != nil { return err @@ -2502,6 +2498,7 @@ type logChangeFn func( desc roachpb.RangeDescriptor, reason kvserverpb.RangeLogEventReason, details string, + logAsync bool, ) error func recordRangeEventsInLog( @@ -2512,6 +2509,7 @@ func recordRangeEventsInLog( rangeDesc *roachpb.RangeDescriptor, reason kvserverpb.RangeLogEventReason, details string, + logAsync bool, logChange logChangeFn, ) error { for _, repDesc := range repDescs { @@ -2529,7 +2527,7 @@ func recordRangeEventsInLog( } } if err := logChange( - ctx, txn, typ, repDesc, *rangeDesc, reason, details, + ctx, txn, typ, repDesc, *rangeDesc, reason, details, logAsync, ); err != nil { return err }