Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: write to system.rangelog async #102813

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ exp,benchmark
15,AlterTableDropConstraint/alter_table_drop_2_check_constraints
15,AlterTableDropConstraint/alter_table_drop_3_check_constraints
9,AlterTableSplit/alter_table_split_at_1_value
13,AlterTableSplit/alter_table_split_at_2_values
17,AlterTableSplit/alter_table_split_at_3_values
11,AlterTableSplit/alter_table_split_at_2_values
14,AlterTableSplit/alter_table_split_at_3_values
7,AlterTableUnsplit/alter_table_unsplit_at_1_value
9,AlterTableUnsplit/alter_table_unsplit_at_2_values
11,AlterTableUnsplit/alter_table_unsplit_at_3_values
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (db *DB) Context() DBContext {
return db.ctx
}

// NewBatch creates a new empty batch.
func (db *DB) NewBatch() *Batch {
return &Batch{}
}

// NewDB returns a new DB.
func NewDB(
actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper,
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,20 @@ func (s *Store) LogReplicaChangeTest(
desc roachpb.RangeDescriptor,
reason kvserverpb.RangeLogEventReason,
details string,
logAsync bool,
) error {
return s.logChange(ctx, txn, changeType, replica, desc, reason, details)
return s.logChange(ctx, txn, changeType, replica, desc, reason, details, logAsync)
}

// LogSplitTest adds a fake split event to the rangelog.
func (s *Store) LogSplitTest(
ctx context.Context,
txn *kv.Txn,
updatedDesc, newDesc roachpb.RangeDescriptor,
reason string,
logAsync bool,
) error {
return s.logSplit(ctx, txn, updatedDesc, newDesc, reason, logAsync)
}

// ReplicateQueuePurgatoryLength returns the number of replicas in replicate
Expand Down
90 changes: 79 additions & 11 deletions pkg/kv/kvserver/range_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,30 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// DBOrTxn is used to provide flexibility for logging rangelog events either
// transactionally (using a Txn), or non-transactionally (using a DB).
type DBOrTxn interface {
Run(ctx context.Context, b *kv.Batch) error
NewBatch() *kv.Batch
}

// RangeLogWriter is used to write range log events to the rangelog
// table.
type RangeLogWriter interface {
WriteRangeLogEvent(context.Context, *kv.Txn, kvserverpb.RangeLogEvent) error
WriteRangeLogEvent(context.Context, DBOrTxn, kvserverpb.RangeLogEvent) error
}

// wrappedRangeLogWriter implements RangeLogWriter, performing logging and
Expand Down Expand Up @@ -55,14 +66,14 @@ func newWrappedRangeLogWriter(
var _ RangeLogWriter = (*wrappedRangeLogWriter)(nil)

func (w *wrappedRangeLogWriter) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
ctx context.Context, runner DBOrTxn, event kvserverpb.RangeLogEvent,
) error {
maybeLogRangeLogEvent(ctx, event)
if c := w.getCounter(event.EventType); c != nil {
c.Inc(1)
}
if w.shouldWrite() && w.underlying != nil {
return w.underlying.WriteRangeLogEvent(ctx, txn, event)
return w.underlying.WriteRangeLogEvent(ctx, runner, event)
}
return nil
}
Expand All @@ -84,9 +95,13 @@ func maybeLogRangeLogEvent(ctx context.Context, event kvserverpb.RangeLogEvent)
// the range which previously existed and is being split in half; the "other"
// range is the new range which is being created.
func (s *Store) logSplit(
ctx context.Context, txn *kv.Txn, updatedDesc, newDesc roachpb.RangeDescriptor, reason string,
ctx context.Context,
txn *kv.Txn,
updatedDesc, newDesc roachpb.RangeDescriptor,
reason string,
logAsync bool,
) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: updatedDesc.RangeID,
EventType: kvserverpb.RangeLogEventType_split,
Expand All @@ -97,7 +112,9 @@ func (s *Store) logSplit(
NewDesc: &newDesc,
Details: reason,
},
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// logMerge logs a range split event into the event table. The affected range is
Expand All @@ -106,9 +123,9 @@ func (s *Store) logSplit(
// TODO(benesch): There are several different reasons that a range merge
// could occur, and that information should be logged.
func (s *Store) logMerge(
ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor,
ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor, logAsync bool,
) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: updatedLHSDesc.RangeID,
EventType: kvserverpb.RangeLogEventType_merge,
Expand All @@ -118,7 +135,9 @@ func (s *Store) logMerge(
UpdatedDesc: &updatedLHSDesc,
RemovedDesc: &rhsDesc,
},
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// logChange logs a replica change event, which represents a replica being added
Expand All @@ -133,6 +152,7 @@ func (s *Store) logChange(
desc roachpb.RangeDescriptor,
reason kvserverpb.RangeLogEventReason,
details string,
logAsync bool,
) error {
var logType kvserverpb.RangeLogEventType
var info kvserverpb.RangeLogEvent_Info
Expand Down Expand Up @@ -173,13 +193,15 @@ func (s *Store) logChange(
return errors.Errorf("unknown replica change type %s", changeType)
}

return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{
logEvent := kvserverpb.RangeLogEvent{
Timestamp: selectEventTimestamp(s, txn.ReadTimestamp()),
RangeID: desc.RangeID,
EventType: logType,
StoreID: s.StoreID(),
Info: &info,
})
}

return writeToRangeLogTable(ctx, s, txn, logEvent, logAsync)
}

// selectEventTimestamp selects a timestamp for this log message. If the
Expand All @@ -196,3 +218,49 @@ func selectEventTimestamp(s *Store, input hlc.Timestamp) time.Time {
}
return input.GoTime()
}

// writeToRangeLogTable writes a range-change log event to system.rangelog by
// invoking RangeLogWriter.WriteRangeLogEvent. If logAsync is false, the logging
// is done directly as part of the given transaction. If logAsync is true, the
// logging is done in an async task (with retries and timeouts), and that task is
// added as a commit trigger to the given transaction.
func writeToRangeLogTable(
ctx context.Context, s *Store, txn *kv.Txn, logEvent kvserverpb.RangeLogEvent, logAsync bool,
) error {
if !logAsync {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn, logEvent)
}

asyncLogFn := func(ctx context.Context) {
stopper := txn.DB().Context().Stopper
// Copy the tags from the original context
asyncCtx := logtags.AddTags(context.Background(), logtags.FromContext(ctx))
// Stop writing when the server shuts down.
asyncCtx, stopCancel := stopper.WithCancelOnQuiesce(asyncCtx)

const perAttemptTimeout = 5 * time.Second
const maxAttempts = 5
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = asyncCtx.Done()
retryOpts.MaxRetries = maxAttempts

if err := stopper.RunAsyncTask(
asyncCtx, "rangelog-async", func(ctx context.Context) {
defer stopCancel()
for r := retry.Start(retryOpts); r.Next(); {
if err := timeutil.RunWithTimeout(ctx, "rangelog-timeout", perAttemptTimeout, func(ctx context.Context) error {
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn.DB(), logEvent)
}); err != nil {
log.Warningf(ctx, "error logging to system.rangelog: %v", err)
continue
}
break
}
}); err != nil {
log.Warningf(asyncCtx, "async task error while logging to system.rangelog: %v", err)
stopCancel()
}
}
txn.AddCommitTrigger(asyncLogFn)
return nil
}
Loading