Skip to content

Commit

Permalink
logstore: introduce logstore.Engine
Browse files Browse the repository at this point in the history
This introduces the concept of an Engine specific for use in a log
store. In other words, it's the beginnings of a logical separation of
the state machine and log engines, i.e. cockroachdb#16624.

For now, both continue to be backed by the same storage.Engine, and
LogEngine is not correctly used in all places. For example, snapshot
application hasn't yet been updated to account for the possibility
of two separate engines and writes a set of SSTS that is atomically
ingested into the single engine currently present but which logically
spans both engines (cockroachdb#93251).

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Dec 19, 2022
1 parent 93ed655 commit 4240b28
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "logstore",
srcs = [
"engine.go",
"logstore.go",
"sideload.go",
"sideload_disk.go",
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/logstore/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logstore

import "github.com/cockroachdb/cockroach/pkg/storage"

// Engine is the interface through which the log store interacts with the
// storage engine.
//
// This is identical to storage.Engine but extends it with a marker such that
// callers can be deliberate about the separation between the log engine and
// the state machine engine.
type Engine interface {
storage.Engine
logStoreEngine()
}

type logEngine struct {
storage.Engine
}

func (eng *logEngine) logStoreEngine() {}

// NewLogEngine wraps the provided storage.Engine as an Engine.
func NewLogEngine(eng storage.Engine) Engine {
return &logEngine{Engine: eng}
}
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Metrics struct {
// LogStore is a stub of a separated Raft log storage.
type LogStore struct {
RangeID roachpb.RangeID
Engine storage.Engine
Engine Engine
Sideload SideloadStorage
StateLoader StateLoader
EntryCache *raftentry.Cache
Expand Down Expand Up @@ -275,7 +275,7 @@ func logAppend(
func LoadTerm(
ctx context.Context,
rsl StateLoader,
eng storage.Engine,
eng Engine,
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
index uint64,
Expand Down Expand Up @@ -346,7 +346,7 @@ func LoadTerm(
func LoadEntries(
ctx context.Context,
rsl StateLoader,
eng storage.Engine,
eng Engine,
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
sideloaded SideloadStorage,
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/logstore/sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand All @@ -39,7 +38,7 @@ type DiskSideloadStorage struct {
limiter *rate.Limiter
dir string
dirCreated bool
eng storage.Engine
eng Engine
}

func deprecatedSideloadedPath(
Expand Down Expand Up @@ -68,7 +67,7 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
)
}

func exists(eng storage.Engine, path string) (bool, error) {
func exists(eng Engine, path string) (bool, error) {
_, err := eng.Stat(path)
if err == nil {
return true, nil
Expand All @@ -87,7 +86,7 @@ func NewDiskSideloadStorage(
replicaID roachpb.ReplicaID,
baseDir string,
limiter *rate.Limiter,
eng storage.Engine,
eng Engine,
) (*DiskSideloadStorage, error) {
path := deprecatedSideloadedPath(baseDir, rangeID, replicaID)
newPath := sideloadedPath(baseDir, rangeID)
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestSideloadingSideloadedStorage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
t.Run("Mem", func(t *testing.T) {
eng := storage.NewDefaultInMemForTesting()
eng := NewLogEngine(storage.NewDefaultInMemForTesting())
defer eng.Close()
testSideloadingSideloadedStorage(t, eng)
})
Expand All @@ -96,7 +96,7 @@ func TestSideloadingSideloadedStorage(t *testing.T) {
})
}

func newTestingSideloadStorage(t *testing.T, eng storage.Engine) *DiskSideloadStorage {
func newTestingSideloadStorage(t *testing.T, eng Engine) *DiskSideloadStorage {
st := cluster.MakeTestingClusterSettings()
ss, err := NewDiskSideloadStorage(
st, 1, 2, filepath.Join(eng.GetAuxiliaryDir(), "fake", "testing", "dir"),
Expand All @@ -106,7 +106,7 @@ func newTestingSideloadStorage(t *testing.T, eng storage.Engine) *DiskSideloadSt
return ss
}

func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) {
func testSideloadingSideloadedStorage(t *testing.T, eng Engine) {
ctx := context.Background()
ss := newTestingSideloadStorage(t, eng)

Expand Down Expand Up @@ -438,7 +438,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
context.Background(), tracing.NewTracer(), "test-recording")
defer getRecAndFinish()

eng := storage.NewDefaultInMemForTesting()
eng := NewLogEngine(storage.NewDefaultInMemForTesting())
defer eng.Close()
ss := newTestingSideloadStorage(t, eng)
ec := raftentry.NewCache(1024) // large enough
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
eng := storage.NewDefaultInMemForTesting()
eng := NewLogEngine(storage.NewDefaultInMemForTesting())
defer eng.Close()
sideloaded := newTestingSideloadStorage(t, eng)
postEnts, numSideloaded, size, nonSideloadedSize, err := MaybeSideloadEntries(ctx, test.preEnts, sideloaded)
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
}
}

func newOnDiskEngine(ctx context.Context, t *testing.T) (func(), storage.Engine) {
func newOnDiskEngine(ctx context.Context, t *testing.T) (func(), Engine) {
dir, cleanup := testutils.TempDir(t)
eng, err := storage.Open(
ctx,
Expand All @@ -590,5 +590,5 @@ func newOnDiskEngine(ctx context.Context, t *testing.T) (func(), storage.Engine)
if err != nil {
t.Fatal(err)
}
return cleanup, eng
return cleanup, NewLogEngine(eng)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
replicaID,
ssBase,
r.store.limiters.BulkIOWriteRate,
r.store.engine,
r.store.logEngine,
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// ranges, so can be passed to LogStore methods instead of being stored in it.
s := logstore.LogStore{
RangeID: r.RangeID,
Engine: r.store.engine,
Engine: r.store.logEngine,
Sideload: r.raftMu.sideloaded,
StateLoader: r.raftMu.stateLoader.StateLoader,
EntryCache: r.store.raftEntryCache,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, e
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.Engine(), r.RangeID,
return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.logEngine, r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes)
}

Expand All @@ -123,7 +123,7 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
return r.mu.lastTerm, nil
}
ctx := r.AnnotateCtx(context.TODO())
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.Engine(), r.RangeID,
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.logEngine, r.RangeID,
r.store.raftEntryCache, i)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestRaftSSTableSideloading(t *testing.T) {

tc.store.raftEntryCache.Clear(tc.repl.RangeID, hi)
ents, err := logstore.LoadEntries(
ctx, rsl, tc.store.Engine(), tc.repl.RangeID, tc.store.raftEntryCache,
ctx, rsl, tc.store.logEngine, tc.repl.RangeID, tc.store.raftEntryCache,
tc.repl.raftMu.sideloaded, lo, hi, math.MaxUint64,
)
require.NoError(t, err)
Expand Down
21 changes: 14 additions & 7 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
Expand Down Expand Up @@ -740,10 +741,14 @@ increasing over time (see Replica.setTombstoneKey).
NOTE: to the best of our knowledge, we don't rely on this invariant.
*/
type Store struct {
Ident *roachpb.StoreIdent // pointer to catch access before Start() is called
cfg StoreConfig
db *kv.DB
engine storage.Engine // The underlying key-value store
Ident *roachpb.StoreIdent // pointer to catch access before Start() is called
cfg StoreConfig
db *kv.DB

// TODO(sep-raft-log): use an interface with a marker method here.
engine storage.Engine // StateMachine access
logEngine logstore.Engine // LogStore access

tsCache tscache.Cache // Most recent timestamps for keys / key ranges
allocator allocatorimpl.Allocator // Makes allocation decisions
replRankings *ReplicaRankings
Expand Down Expand Up @@ -1221,9 +1226,11 @@ func NewStore(
iot := ioThresholds{}
iot.Replace(nil, 1.0) // init as empty
s := &Store{
cfg: cfg,
db: cfg.DB, // TODO(tschottdorf): remove redundancy.
engine: eng,
cfg: cfg,
db: cfg.DB, // TODO(tschottdorf): remove redundancy.
engine: eng,
// TODO(sep-raft-log): allow for this to be a separate engine (experimentally).
logEngine: logstore.NewLogEngine(eng),
nodeDesc: nodeDesc,
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
ctSender: cfg.ClosedTimestampSender,
Expand Down

0 comments on commit 4240b28

Please sign in to comment.