Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
93495: logstore: pool allocations for logAppend r=nvanbenschoten a=tbg

This adds a benchmark appending a preallocated single 256kb entry to the log, and then optimizes it.

    $ benchdiff ./pkg/kv/kvserver/logstore --count 5 --old ... --post-checkout './dev gen go'
    name                                    old time/op    new time/op    delta
    LogStore_StoreEntries/bytes=256_KiB-24     140µs ± 8%      49µs ± 2%   -64.96%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=512_KiB-24     295µs ± 5%     137µs ± 2%   -53.44%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=1.0_KiB-24    2.35µs ± 2%    1.71µs ± 1%   -27.31%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=2.0_MiB-24    3.49ms ± 5%    2.68ms ± 5%   -23.08%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=1.0_MiB-24    1.43ms ± 5%    1.15ms ± 5%   -19.80%  (p=0.008 n=5+5)

    name                                    old alloc/op   new alloc/op   delta
    LogStore_StoreEntries/bytes=256_KiB-24     274kB ± 0%       0kB ±22%   -99.91%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=512_KiB-24     555kB ± 0%       1kB ± 6%   -99.87%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=1.0_KiB-24    1.31kB ± 0%    0.16kB ± 0%   -88.05%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=1.0_MiB-24    3.50MB ± 1%    2.23MB ± 1%   -36.26%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=2.0_MiB-24    6.97MB ± 1%    4.64MB ± 1%   -33.45%  (p=0.008 n=5+5)

    name                                    old allocs/op  new allocs/op  delta
    LogStore_StoreEntries/bytes=1.0_KiB-24      1.00 ± 0%      0.00       -100.00%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=256_KiB-24      1.00 ± 0%      0.00       -100.00%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=512_KiB-24      1.00 ± 0%      0.00       -100.00%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=1.0_MiB-24      5.00 ± 0%      4.00 ± 0%   -20.00%  (p=0.008 n=5+5)
    LogStore_StoreEntries/bytes=2.0_MiB-24      6.00 ± 0%      5.00 ± 0%      ~     (p=0.079 n=4+5)

Epic: none
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Dec 18, 2022
2 parents 93c070a + bf41206 commit 51eb14b
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 8 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ go_library(

go_test(
name = "logstore_test",
srcs = ["sideload_test.go"],
srcs = [
"logstore_bench_test.go",
"sideload_test.go",
],
args = ["-test.timeout=295s"],
embed = [":logstore"],
deps = [
Expand All @@ -51,8 +54,10 @@ go_test(
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/testutils",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
29 changes: 22 additions & 7 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package logstore

import (
"context"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -102,18 +103,26 @@ type LogStore struct {
Metrics Metrics
}

func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
// Use an unindexed batch because we don't need to read our writes, and
// it is more efficient.
return eng.NewUnindexedBatch(false /* writeOnly */)
}

// StoreEntries persists newly appended Raft log Entries to the log storage.
// Accepts the state of the log before the operation, returns the state after.
// Persists HardState atomically with, or strictly after Entries.
func (s *LogStore) StoreEntries(
ctx context.Context, state RaftState, rd Ready, stats *AppendStats,
) (RaftState, error) {
// TODO(pavelkalinnikov): Doesn't this comment contradict the code?
// Use a more efficient write-only batch because we don't need to do any
// reads from the batch. Any reads are performed on the underlying DB.
batch := s.Engine.NewUnindexedBatch(false /* writeOnly */)
batch := newStoreEntriesBatch(s.Engine)
defer batch.Close()
return s.storeEntriesAndCommitBatch(ctx, state, rd, stats, batch)
}

func (s *LogStore) storeEntriesAndCommitBatch(
ctx context.Context, state RaftState, rd Ready, stats *AppendStats, batch storage.Batch,
) (RaftState, error) {
prevLastIndex := state.LastIndex
if len(rd.Entries) > 0 {
stats.Begin = timeutil.Now()
Expand Down Expand Up @@ -211,6 +220,10 @@ func (s *LogStore) StoreEntries(
return state, nil
}

var valPool = sync.Pool{
New: func() interface{} { return &roachpb.Value{} },
}

// logAppend adds the given entries to the raft log. Takes the previous log
// state, and returns the updated state. It's the caller's responsibility to
// maintain exclusive access to the raft log for the duration of the method
Expand All @@ -230,7 +243,9 @@ func logAppend(
return prev, nil
}
var diff enginepb.MVCCStats
var value roachpb.Value
value := valPool.Get().(*roachpb.Value)
value.RawBytes = value.RawBytes[:0]
defer valPool.Put(value)
for i := range entries {
ent := &entries[i]
key := keys.RaftLogKeyFromPrefix(raftLogPrefix, ent.Index)
Expand All @@ -241,9 +256,9 @@ func logAppend(
value.InitChecksum(key)
var err error
if ent.Index > prev.LastIndex {
err = storage.MVCCBlindPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */)
err = storage.MVCCBlindPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, *value, nil /* txn */)
} else {
err = storage.MVCCPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */)
err = storage.MVCCPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, *value, nil /* txn */)
}
if err != nil {
return RaftState{}, err
Expand Down
106 changes: 106 additions & 0 deletions pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logstore

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3/raftpb"
)

type discardBatch struct {
storage.Batch
}

func (b *discardBatch) Commit(bool) error {
return nil
}

func BenchmarkLogStore_StoreEntries(b *testing.B) {
defer log.Scope(b).Close(b)
const kb = 1 << 10
const mb = 1 << 20
for _, bytes := range []int64{1 * kb, 256 * kb, 512 * kb, 1 * mb, 2 * mb} {
b.Run(fmt.Sprintf("bytes=%s", humanizeutil.IBytes(bytes)), func(b *testing.B) {
runBenchmarkLogStore_StoreEntries(b, bytes)
})
}
}

func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
const tenMB = 10 * 1 << 20
ec := raftentry.NewCache(tenMB)
const rangeID = 1
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
s := LogStore{
RangeID: rangeID,
Engine: eng,
StateLoader: NewStateLoader(rangeID),
EntryCache: ec,
Settings: cluster.MakeTestingClusterSettings(),
Metrics: Metrics{
RaftLogCommitLatency: metric.NewHistogram(metric.Metadata{}, 10*time.Second, metric.IOLatencyBuckets),
},
}

ctx := context.Background()
rs := RaftState{
LastTerm: 1,
ByteSize: 0,
}
var ents []raftpb.Entry
data := make([]byte, bytes)
rand.New(rand.NewSource(0)).Read(data)
ents = append(ents, raftpb.Entry{
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: kvserverbase.EncodeRaftCommand(kvserverbase.RaftVersionStandard, "deadbeef", data),
})
stats := &AppendStats{}

b.ReportAllocs()
b.ResetTimer()
// Use a batch that ignores Commit() so that we can run as many iterations as
// we like without building up large amounts of data in the Engine. This hides
// some allocations that would occur down in pebble but that's fine, we're not
// here to measure those.
batch := &discardBatch{}
for i := 0; i < b.N; i++ {
batch.Batch = newStoreEntriesBatch(eng)
rd := Ready{
HardState: raftpb.HardState{},
Entries: ents,
MustSync: true,
}
var err error
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, rd, stats, batch)
if err != nil {
b.Fatal(err)
}
batch.Batch.Close()
ents[0].Index++
}
require.EqualValues(b, b.N, rs.LastIndex)
}

0 comments on commit 51eb14b

Please sign in to comment.