Skip to content

Commit

Permalink
kvserver: write to system.rangelog async
Browse files Browse the repository at this point in the history
Previously, writing to the system.rangelog table was done as part of the
transaction that triggered the range change (e.g. split, merge,
rebalance). If the logging failed for some reason (e.g. JSON being
logged was too large), the entire transaction needed to be retried.
This has caused at least one major incident.

This change introduces the option to write to system.rangelog async, and
not as part of the original transaction; this option is also now used
by default for all writes to system.rangelog. When logging async, the
actual write to system.rangelog is done in an async task executed as a
commit trigger after the original transaction ends.

Epic: CRDB-16517

Fixes: cockroachdb#82538

Informs: cockroachdb#104075

Release note: None
  • Loading branch information
miraradeva committed Jun 5, 2023
1 parent c6255a1 commit 64132d3
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ exp,benchmark
15,AlterTableDropConstraint/alter_table_drop_2_check_constraints
15,AlterTableDropConstraint/alter_table_drop_3_check_constraints
9,AlterTableSplit/alter_table_split_at_1_value
13,AlterTableSplit/alter_table_split_at_2_values
17,AlterTableSplit/alter_table_split_at_3_values
11,AlterTableSplit/alter_table_split_at_2_values
14,AlterTableSplit/alter_table_split_at_3_values
7,AlterTableUnsplit/alter_table_unsplit_at_1_value
9,AlterTableUnsplit/alter_table_unsplit_at_2_values
11,AlterTableUnsplit/alter_table_unsplit_at_3_values
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (db *DB) Context() DBContext {
return db.ctx
}

// NewBatch creates a new empty batch.
func (db *DB) NewBatch() *Batch {
return &Batch{}
}

// NewDB returns a new DB.
func NewDB(
actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper,
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,20 @@ func (s *Store) LogReplicaChangeTest(
desc roachpb.RangeDescriptor,
reason kvserverpb.RangeLogEventReason,
details string,
logAsync bool,
) error {
return s.logChange(ctx, txn, changeType, replica, desc, reason, details)
return s.logChange(ctx, txn, changeType, replica, desc, reason, details, logAsync)
}

// LogSplitTest adds a fake split event to the rangelog.
func (s *Store) LogSplitTest(
ctx context.Context,
txn *kv.Txn,
updatedDesc, newDesc roachpb.RangeDescriptor,
reason string,
logAsync bool,
) error {
return s.logSplit(ctx, txn, updatedDesc, newDesc, reason, logAsync)
}

// ReplicateQueuePurgatoryLength returns the number of replicas in replicate
Expand Down
89 changes: 78 additions & 11 deletions pkg/kv/kvserver/range_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,30 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// DBOrTxn is used to provide flexibility for logging rangelog events either
// transactionally (using a Txn), or non-transactionally (using a DB).
type DBOrTxn interface {
Run(ctx context.Context, b *kv.Batch) error
NewBatch() *kv.Batch
}

// RangeLogWriter is used to write range log events to the rangelog
// table.
type RangeLogWriter interface {
WriteRangeLogEvent(context.Context, *kv.Txn, kvserverpb.RangeLogEvent) error
WriteRangeLogEvent(context.Context, DBOrTxn, kvserverpb.RangeLogEvent) error
}

// wrappedRangeLogWriter implements RangeLogWriter, performing logging and
Expand Down Expand Up @@ -55,14 +66,14 @@ func newWrappedRangeLogWriter(
var _ RangeLogWriter = (*wrappedRangeLogWriter)(nil)

func (w *wrappedRangeLogWriter) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
ctx context.Context, runner DBOrTxn, event kvserverpb.RangeLogEvent,
) error {
maybeLogRangeLogEvent(ctx, event)
if c := w.getCounter(event.EventType); c != nil {
c.Inc(1)
}
if w.shouldWrite() && w.underlying != nil {
return w.underlying.WriteRangeLogEvent(ctx, txn, event)
return w.underlying.WriteRangeLogEvent(ctx, runner, event)
}
return nil
}
Expand All @@ -84,9 +95,13 @@ func maybeLogRangeLogEvent(ctx context.Context, event kvserverpb.RangeLogEvent)
// the range which previously existed and is being split in half; the "other"
// range is the new range which is being created.
func (s *Store) logSplit(
ctx context.Context, txn *kv.Txn, updatedDesc, newDesc roachpb.RangeDescriptor, reason string,
ctx context.Context,
txn *kv.Txn,
updatedDesc, newDesc roachpb.RangeDescriptor,
reason string,
logAsync bool,
) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: updatedDesc.RangeID,
EventType: kvserverpb.RangeLogEventType_split,
Expand All @@ -97,7 +112,9 @@ func (s *Store) logSplit(
NewDesc: &newDesc,
Details: reason,
},
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// logMerge logs a range split event into the event table. The affected range is
Expand All @@ -106,9 +123,9 @@ func (s *Store) logSplit(
// TODO(benesch): There are several different reasons that a range merge
// could occur, and that information should be logged.
func (s *Store) logMerge(
ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor,
ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor, logAsync bool,
) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: updatedLHSDesc.RangeID,
EventType: kvserverpb.RangeLogEventType_merge,
Expand All @@ -118,7 +135,9 @@ func (s *Store) logMerge(
UpdatedDesc: &updatedLHSDesc,
RemovedDesc: &rhsDesc,
},
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// logChange logs a replica change event, which represents a replica being added
Expand All @@ -133,6 +152,7 @@ func (s *Store) logChange(
desc roachpb.RangeDescriptor,
reason kvserverpb.RangeLogEventReason,
details string,
logAsync bool,
) error {
var logType kvserverpb.RangeLogEventType
var info kvserverpb.RangeLogEvent_Info
Expand Down Expand Up @@ -173,13 +193,15 @@ func (s *Store) logChange(
return errors.Errorf("unknown replica change type %s", changeType)
}

return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: desc.RangeID,
EventType: logType,
StoreID: s.StoreID(),
Info: &info,
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// selectEventTimestamp selects a timestamp for this log message. If the
Expand All @@ -196,3 +218,48 @@ func selectEventTimestamp(s *Store, input hlc.Timestamp) time.Time {
}
return input.GoTime()
}

// writeToRangeLogTable writes a range-change log event to system.rangelog by
// invoking RangeLogWriter.WriteRangeLogEvent. If logAsync is false, the logging
// is done directly as part of the given transaction. If logAsync is true, the
// logging is done in an async task (with retries and timeouts), and that task is
// added as a commit trigger to the given transaction.
func writeToRangeLogTable(
ctx context.Context, s *Store, txn *kv.Txn, logEvent kvserverpb.RangeLogEvent, logAsync bool,
) error {
if !logAsync {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, logEvent)
}

asyncLogFn := func(ctx context.Context) {
stopper := txn.DB().Context().Stopper
// Copy the tags from the original context
asyncCtx := logtags.AddTags(context.Background(), logtags.FromContext(ctx))
// Stop writing when the server shuts down.
asyncCtx, stopCancel := stopper.WithCancelOnQuiesce(asyncCtx)
defer stopCancel()

const perAttemptTimeout = 5 * time.Second
const maxAttempts = 5
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = asyncCtx.Done()
retryOpts.MaxRetries = maxAttempts

if err := stopper.RunAsyncTask(
asyncCtx, "rangelog-async", func(ctx context.Context) {
for r := retry.Start(retryOpts); r.Next(); {
if err := timeutil.RunWithTimeout(ctx, "rangelog-timeout", perAttemptTimeout, func(ctx context.Context) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn.DB(), logEvent)
}); err != nil {
log.Warningf(ctx, "error logging to system.rangelog: %v", err)
continue
}
break
}
}); err != nil {
log.Warningf(asyncCtx, "async task error while logging to system.rangelog: %v", err)
}
}
txn.AddCommitTrigger(asyncLogFn)
return nil
}
Loading

0 comments on commit 64132d3

Please sign in to comment.