Skip to content

Commit

Permalink
Merge pull request #2762 from oasislabs/kostko/feature/mkvs-badger-ma…
Browse files Browse the repository at this point in the history
…naged

go/storage/mkvs: Use Badger to manage versions
  • Loading branch information
kostko authored Mar 16, 2020
2 parents 8001e4d + 928c10f commit e7d25bf
Show file tree
Hide file tree
Showing 17 changed files with 704 additions and 761 deletions.
5 changes: 5 additions & 0 deletions .changelog/2674.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/storage/mkvs: Use Badger to manage versions

By restricting how Prune behaves (it can now only remove the earliest round)
we can leverage Badger's managed mode to have it manage versions for us. This
avoids the need to track node lifetimes separately.
2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect
github.com/dgraph-io/badger/v2 v2.0.0
github.com/dgraph-io/badger/v2 v2.0.1
github.com/eapache/channels v1.1.0
github.com/eapache/queue v1.1.0 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f h1:6itBiEUtu+gOzXZWn46bM5/qm8LlV6/byR7Yflx/y6M=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgraph-io/badger/v2 v2.0.0 h1:Cr05o2TUd2IcLbEY0aGd8mbjm1YyQpy+dswo3BcDXrE=
github.com/dgraph-io/badger/v2 v2.0.0/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04=
github.com/dgraph-io/badger/v2 v2.0.1 h1:+D6dhIqC6jIeCclnxMHqk4HPuXgrRN5UfBsLR4dNQ3A=
github.com/dgraph-io/badger/v2 v2.0.1/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04=
github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e h1:aeUNgwup7PnDOBAD1BOKAqzb/W/NksOj6r3dwKKuqfg=
github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
35 changes: 18 additions & 17 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var (
)

// Config is the storage backend configuration.
type Config struct {
type Config struct { // nolint: maligned
// Backend is the database backend.
Backend string

Expand All @@ -96,14 +96,22 @@ type Config struct {

// MaxCacheSize is the maximum in-memory cache size for the database.
MaxCacheSize int64

// DiscardWriteLogs will cause all write logs to be discarded.
DiscardWriteLogs bool

// NoFsync will disable fsync() where possible.
NoFsync bool
}

// ToNodeDB converts from a Config to a node DB Config.
func (cfg *Config) ToNodeDB() *nodedb.Config {
return &nodedb.Config{
DB: cfg.DB,
Namespace: cfg.Namespace,
MaxCacheSize: cfg.MaxCacheSize,
DB: cfg.DB,
Namespace: cfg.Namespace,
MaxCacheSize: cfg.MaxCacheSize,
NoFsync: cfg.NoFsync,
DiscardWriteLogs: cfg.DiscardWriteLogs,
}
}

Expand Down Expand Up @@ -205,6 +213,9 @@ type ProofResponse = syncer.ProofResponse
// Proof is a Merkle proof for a subtree.
type Proof = syncer.Proof

// NodeDB is a node database.
type NodeDB = nodedb.NodeDB

// ApplyOp is an apply operation within a batch of apply operations.
type ApplyOp struct {
// SrcRound is the source root round.
Expand Down Expand Up @@ -327,21 +338,11 @@ type Backend interface {
type LocalBackend interface {
Backend

// HasRoot checks if the storage backend contains the specified storage root.
HasRoot(root Root) bool

// Finalize finalizes the specified round. The passed list of roots are the
// roots within the round that have been finalized. All non-finalized roots
// can be discarded.
Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error

// Prune removes all roots recorded under the given namespace and round.
//
// Returns the number of pruned nodes.
Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error)

// Checkpointer returns the checkpoint creator/restorer for this storage backend.
Checkpointer() checkpoint.CreateRestorer

// NodeDB returns the underlying node database.
NodeDB() nodedb.NodeDB
}

// ClientBackend is a storage client backend implementation.
Expand Down
17 changes: 4 additions & 13 deletions go/storage/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"path/filepath"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/storage/api"
Expand Down Expand Up @@ -202,18 +201,10 @@ func (ba *databaseBackend) GetCheckpointChunk(ctx context.Context, chunk *checkp
return ba.checkpointer.GetCheckpointChunk(ctx, chunk, w)
}

func (ba *databaseBackend) HasRoot(root api.Root) bool {
return ba.nodedb.HasRoot(root)
}

func (ba *databaseBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
return ba.nodedb.Finalize(ctx, namespace, round, roots)
}

func (ba *databaseBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
return ba.nodedb.Prune(ctx, namespace, round)
}

func (ba *databaseBackend) Checkpointer() checkpoint.CreateRestorer {
return ba.checkpointer
}

func (ba *databaseBackend) NodeDB() nodedb.NodeDB {
return ba.nodedb
}
1 change: 1 addition & 0 deletions go/storage/database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func doTestImpl(t *testing.T, backend string) {
ApplyLockLRUSlots: 100,
Namespace: testNs,
MaxCacheSize: 16 * 1024 * 1024,
NoFsync: true,
}
err error
)
Expand Down
61 changes: 5 additions & 56 deletions go/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/node"
"github.com/oasislabs/oasis-core/go/storage/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint"
Expand Down Expand Up @@ -43,26 +41,12 @@ var (
},
[]string{"call"},
)
storagePrunedCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "oasis_storage_pruned",
Help: "Number of pruned nodes.",
},
)
storageFinalizedCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "oasis_storage_finalized",
Help: "Number of finalized rounds.",
},
)

storageCollectors = []prometheus.Collector{
storageFailures,
storageCalls,
storageLatency,
storageValueSize,
storagePrunedCount,
storageFinalizedCount,
}

labelApply = prometheus.Labels{"call": "apply"}
Expand All @@ -72,9 +56,6 @@ var (
labelSyncGet = prometheus.Labels{"call": "sync_get"}
labelSyncGetPrefixes = prometheus.Labels{"call": "sync_get_prefixes"}
labelSyncIterate = prometheus.Labels{"call": "sync_iterate"}
labelHasRoot = prometheus.Labels{"call": "has_root"}
labelFinalize = prometheus.Labels{"call": "finalize"}
labelPrune = prometheus.Labels{"call": "prune"}

_ api.LocalBackend = (*metricsWrapper)(nil)
_ api.ClientBackend = (*metricsWrapper)(nil)
Expand Down Expand Up @@ -198,52 +179,20 @@ func (w *metricsWrapper) SyncIterate(ctx context.Context, request *api.IterateRe
return res, err
}

func (w *metricsWrapper) HasRoot(root api.Root) bool {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return false
}
start := time.Now()
flag := localBackend.HasRoot(root)
storageLatency.With(labelHasRoot).Observe(time.Since(start).Seconds())
storageCalls.With(labelHasRoot).Inc()
return flag
}

func (w *metricsWrapper) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return api.ErrUnsupported
}
start := time.Now()
err := localBackend.Finalize(ctx, namespace, round, roots)
storageLatency.With(labelFinalize).Observe(time.Since(start).Seconds())
storageCalls.With(labelFinalize).Inc()
if err == nil {
storageFinalizedCount.Inc()
}
return err
}

func (w *metricsWrapper) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
func (w *metricsWrapper) Checkpointer() checkpoint.CreateRestorer {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return 0, api.ErrUnsupported
return nil
}
start := time.Now()
pruned, err := localBackend.Prune(ctx, namespace, round)
storageLatency.With(labelPrune).Observe(time.Since(start).Seconds())
storageCalls.With(labelPrune).Inc()
storagePrunedCount.Add(float64(pruned))
return pruned, err
return localBackend.Checkpointer()
}

func (w *metricsWrapper) Checkpointer() checkpoint.CreateRestorer {
func (w *metricsWrapper) NodeDB() api.NodeDB {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return nil
}
return localBackend.Checkpointer()
return localBackend.NodeDB()
}

func newMetricsWrapper(base api.Backend) api.Backend {
Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (c *cache) remoteSync(ctx context.Context, ptr *node.Pointer, fetcher readS
if c.persistEverythingFromSyncer {
// NOTE: This is a dummy batch, we assume that the node database backend is a
// cache-only backend and does not care about correct values.
batch = c.db.NewBatch(c.syncRoot, false)
batch = c.db.NewBatch(c.syncRoot, c.syncRoot.Round, false)
dbSubtree = batch.MaybeStartSubtree(nil, 0, subtree)
}

Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestFileCheckpointCreator(t *testing.T) {
err = rs.RestoreChunk(ctx, cm, &buf)
require.NoError(err, "RestoreChunk")
}
err = ndb2.Finalize(ctx, root.Namespace, root.Round, []hash.Hash{root.Hash})
err = ndb2.Finalize(ctx, root.Round, []hash.Hash{root.Hash})
require.NoError(err, "Finalize")

// Verify that everything has been restored.
Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/checkpoint/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func restoreChunk(ctx context.Context, ndb db.NodeDB, chunk *ChunkMetadata, r io
}
emptyRoot.Hash.Empty()

batch := ndb.NewBatch(emptyRoot, true)
batch := ndb.NewBatch(emptyRoot, chunk.Root.Round, true)
defer batch.Reset()

subtree := batch.MaybeStartSubtree(nil, 0, ptr)
Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *tree) commitWithHooks(
oldRoot.Round = round
}

batch := t.cache.db.NewBatch(oldRoot, false)
batch := t.cache.db.NewBatch(oldRoot, round, false)
defer batch.Reset()

subtree := batch.MaybeStartSubtree(nil, 0, t.cache.pendingRoot)
Expand Down
56 changes: 44 additions & 12 deletions go/storage/mkvs/urkel/db/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,26 @@ var (
// ErrBadNamespace indicates that the passed namespace does not match what is
// actually contained within the database.
ErrBadNamespace = errors.New(ModuleName, 10, "urkel: bad namespace")
// ErrNotEarliest indicates that the given round is not the earliest round.
ErrNotEarliest = errors.New(ModuleName, 11, "urkel: round is not the earliest round")
)

// Config is the node database backend configuration.
type Config struct {
type Config struct { // nolint: maligned
// DB is the path to the database.
DB string

// DebugNoFsync will disable fsync() where possible.
DebugNoFsync bool
// NoFsync will disable fsync() where possible.
NoFsync bool

// Namespace is the namespace contained within the database.
Namespace common.Namespace

// MaxCacheSize is the maximum in-memory cache size for the database.
MaxCacheSize int64

// DiscardWriteLogs will cause all write logs to be discarded.
DiscardWriteLogs bool
}

// NodeDB is the persistence layer used for persisting the in-memory tree.
Expand All @@ -66,26 +71,38 @@ type NodeDB interface {
// GetWriteLog retrieves a write log between two storage instances from the database.
GetWriteLog(ctx context.Context, startRoot node.Root, endRoot node.Root) (writelog.Iterator, error)

// GetLatestRound returns the most recent round in the node database.
GetLatestRound(ctx context.Context) (uint64, error)

// GetEarliestRound returns the earliest round in the node database.
GetEarliestRound(ctx context.Context) (uint64, error)

// GetRootsForRound returns a list of roots stored under the given round.
GetRootsForRound(ctx context.Context, round uint64) ([]hash.Hash, error)

// NewBatch starts a new batch.
//
// The chunk argument specifies whether the given batch is being used to import a chunk of an
// existing root. Chunks may contain unresolved pointers (e.g., pointers that point to hashes
// which are not present in the database). Committing a chunk batch will prevent the round from
// being finalized.
NewBatch(oldRoot node.Root, chunk bool) Batch
NewBatch(oldRoot node.Root, round uint64, chunk bool) Batch

// HasRoot checks whether the given root exists.
HasRoot(root node.Root) bool

// Finalize finalizes the specified round. The passed list of roots are the
// roots within the round that have been finalized. All non-finalized roots
// can be discarded.
Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error
Finalize(ctx context.Context, round uint64, roots []hash.Hash) error

// Prune removes all roots recorded under the given namespace and round.
// Prune removes all roots recorded under the given round.
//
// Returns the number of pruned nodes.
Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error)
// Only the earliest round can be pruned, passing any other round will result in an error.
Prune(ctx context.Context, round uint64) error

// Size returns the size of the database in bytes.
Size() (int64, error)

// Close closes the database.
Close()
Expand Down Expand Up @@ -168,19 +185,34 @@ func (d *nopNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, endRoo
return nil, ErrWriteLogNotFound
}

func (d *nopNodeDB) GetLatestRound(ctx context.Context) (uint64, error) {
return 0, nil
}

func (d *nopNodeDB) GetEarliestRound(ctx context.Context) (uint64, error) {
return 0, nil
}

func (d *nopNodeDB) GetRootsForRound(ctx context.Context, round uint64) ([]hash.Hash, error) {
return nil, nil
}

func (d *nopNodeDB) HasRoot(root node.Root) bool {
return false
}

func (d *nopNodeDB) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
func (d *nopNodeDB) Finalize(ctx context.Context, round uint64, roots []hash.Hash) error {
return nil
}

func (d *nopNodeDB) Prune(ctx context.Context, round uint64) error {
return nil
}

func (d *nopNodeDB) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
func (d *nopNodeDB) Size() (int64, error) {
return 0, nil
}

// Close is a no-op.
func (d *nopNodeDB) Close() {
}

Expand All @@ -189,7 +221,7 @@ type nopBatch struct {
BaseBatch
}

func (d *nopNodeDB) NewBatch(oldRoot node.Root, chunk bool) Batch {
func (d *nopNodeDB) NewBatch(oldRoot node.Root, round uint64, chunk bool) Batch {
return &nopBatch{}
}

Expand Down
Loading

0 comments on commit e7d25bf

Please sign in to comment.