Skip to content

Commit

Permalink
Core db simplify new api storage modes (#2075)
Browse files Browse the repository at this point in the history
* Aristo+Kvt: Fix backend `dup()` function in api setup

why:
  Backend object is subject to an inheritance cascade which was not
  taken care of, before. Only the base object was duplicated.

* Kvt: Simplify DB clone/peers management

* Aristo: Simplify DB clone/peers management

* Aristo: Adjust unit test for working with memory DB only

why:
  This currently causes some memory corruption persumably in the
  `libc` background layer.

* CoredDb+Kvt: Simplify API for KVT

why:
  Simplified storage models (was over engineered) for better performance
  and code maintenance.

* CoredDb+Aristo: Simplify API for `Aristo`

why:
  Only single database state needed here. Accessing a similar state will
  be implemented from outside this module using a context layer. This
  gives better performance and improves code maintenance.

* Fix Copyright headers

* CoreDb: Turn off API tracking

why:
  CI would ot go through. Was accidentally turned on.
  • Loading branch information
mjfh authored Mar 14, 2024
1 parent 0fb7632 commit 0d73637
Show file tree
Hide file tree
Showing 32 changed files with 790 additions and 1,173 deletions.
38 changes: 18 additions & 20 deletions nimbus/core/clique/snapshot/snapshot_desc.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
Expand Down Expand Up @@ -69,8 +69,7 @@ proc append[K,V](rw: var RlpWriter; tab: Table[K,V]) =
for key,value in tab.pairs:
rw.append((key,value))

proc read[K,V](rlp: var Rlp;
Q: type Table[K,V]): Q {.gcsafe, raises: [CatchableError].} =
proc read[K,V](rlp: var Rlp; Q: type Table[K,V]): Q {.gcsafe, raises: [RlpError].} =
for w in rlp.items:
let (key,value) = w.read((K,V))
result[key] = value
Expand Down Expand Up @@ -145,36 +144,35 @@ proc `blockHash=`*(s: Snapshot; hash: Hash256) =
proc loadSnapshot*(cfg: CliqueCfg; hash: Hash256):
Result[Snapshot,CliqueError] =
## Load an existing snapshot from the database.
var s = Snapshot(cfg: cfg)
var
s = Snapshot(cfg: cfg)
try:
let rc = s.cfg.db.newKvt(Shared).get(hash.cliqueSnapshotKey.toOpenArray)
let rc = s.cfg.db.newKvt().get(hash.cliqueSnapshotKey.toOpenArray)
if rc.isOk:
s.data = rc.value.decode(SnapshotData)
else:
if rc.error.error != KvtNotFound:
error logTxt "get() failed", error=($$rc.error)
return err((errSnapshotLoad,""))
except CatchableError as e:
except RlpError as e:
return err((errSnapshotLoad, $e.name & ": " & e.msg))
ok(s)

# clique/snapshot.go(104): func (s *Snapshot) store(db [..]
proc storeSnapshot*(cfg: CliqueCfg; s: Snapshot): CliqueOkResult =
## Insert the snapshot into the database.
try:
let
key = s.data.blockHash.cliqueSnapshotKey
val = rlp.encode(s.data)
db = s.cfg.db.newKvt(Companion)
db.put(key.toOpenArray, val).isOkOr:
error logTxt "put() failed", `error`=($$error)
db.persistent()

cfg.nSnaps.inc
cfg.snapsData += val.len.uint
except CatchableError as e:
return err((errSnapshotStore, $e.name & ": " & e.msg))

let
key = s.data.blockHash.cliqueSnapshotKey
val = rlp.encode(s.data)
db = s.cfg.db.newKvt(sharedTable = false) # bypass block chain txs
defer: db.forget()
let rc = db.put(key.toOpenArray, val)
if rc.isErr:
error logTxt "put() failed", `error`=($$rc.error)
db.persistent()

cfg.nSnaps.inc
cfg.snapsData += val.len.uint
ok()

# ------------------------------------------------------------------------------
Expand Down
37 changes: 28 additions & 9 deletions nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import
std/times,
eth/[common, trie/nibbles],
results,
"."/[aristo_delete, aristo_desc, aristo_desc/desc_backend, aristo_fetch,
aristo_get, aristo_hashify, aristo_hike, aristo_init, aristo_merge,
aristo_path, aristo_profile, aristo_serialise, aristo_tx, aristo_vid]
./aristo_desc/desc_backend,
./aristo_init/memory_db,
"."/[aristo_delete, aristo_desc, aristo_fetch, aristo_get, aristo_hashify,
aristo_hike, aristo_init, aristo_merge, aristo_path, aristo_profile,
aristo_serialise, aristo_tx, aristo_vid]

export
AristoDbProfListRef
Expand All @@ -27,6 +29,13 @@ const
AutoValidateApiHooks = defined(release).not
## No validatinon needed for production suite.

AristoPersistentBackendOk = false
## Set true for persistent backend profiling (which needs an extra
## link library.)

when AristoPersistentBackendOk:
import ./aristo_init/rocks_db

# Annotation helper(s)
{.pragma: noRaise, gcsafe, raises: [].}

Expand Down Expand Up @@ -451,11 +460,21 @@ when AutoValidateApiHooks:
doAssert not api.vidFetch.isNil
doAssert not api.vidDispose.isNil

proc validate(prf: AristoApiProfRef; be: BackendRef) =
proc validate(prf: AristoApiProfRef) =
prf.AristoApiRef.validate
doAssert not prf.data.isNil
if not be.isNil:
doAssert not prf.be.isNil

proc dup(be: BackendRef): BackendRef =
case be.kind:
of BackendMemory:
return MemBackendRef(be).dup

of BackendRocksDB:
when AristoPersistentBackendOk:
return RdbBackendRef(be).dup

of BackendVoid:
discard

# ------------------------------------------------------------------------------
# Public API constuctors
Expand Down Expand Up @@ -688,8 +707,8 @@ func init*(
AristoApiProfVidDisposeFn.profileRunner:
api.vidDispose(a, b)

if not be.isNil:
profApi.be = be.dup
profApi.be = be.dup()
if not profApi.be.isNil:

profApi.be.getVtxFn =
proc(a: VertexID): auto =
Expand All @@ -707,7 +726,7 @@ func init*(
result = be.putEndFn(a)

when AutoValidateApiHooks:
profApi.validate be
profApi.validate

profApi

Expand Down
117 changes: 37 additions & 80 deletions nimbus/db/aristo/aristo_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ export
aristo_constants, desc_error, desc_identifiers, desc_structural

type
AristoDudes* = HashSet[AristoDbRef]
## Descriptor peers sharing the same backend

AristoTxRef* = ref object
## Transaction descriptor
db*: AristoDbRef ## Database descriptor
Expand All @@ -55,15 +52,11 @@ type
errKey*: Blob

DudesRef = ref object
case rwOk: bool
of true:
roDudes: AristoDudes ## Read-only peers
else:
rwDb: AristoDbRef ## Link to writable descriptor

VidVtxPair* = object
vid*: VertexID ## Table lookup vertex ID (if any)
vtx*: VertexRef ## Reference to vertex
## List of peers accessing the same database. This list is layzily
## allocated and might be kept with a single entry, i.e. so that
## `{centre} == peers`.
centre: AristoDbRef ## Link to peer with write permission
peers: HashSet[AristoDbRef] ## List of all peers

AristoDbRef* = ref AristoDbObj
AristoDbObj* = object
Expand Down Expand Up @@ -147,12 +140,6 @@ func hash*(db: AristoDbRef): Hash =
## Table/KeyedQueue/HashSet mixin
cast[pointer](db).hash

func dup*(wp: VidVtxPair): VidVtxPair =
## Safe copy of `wp` argument
VidVtxPair(
vid: wp.vid,
vtx: wp.vtx.dup)

# ------------------------------------------------------------------------------
# Public functions, `dude` related
# ------------------------------------------------------------------------------
Expand All @@ -161,18 +148,15 @@ func isCentre*(db: AristoDbRef): bool =
## This function returns `true` is the argument `db` is the centre (see
## comments on `reCentre()` for details.)
##
db.dudes.isNil or db.dudes.rwOk
db.dudes.isNil or db.dudes.centre == db

func getCentre*(db: AristoDbRef): AristoDbRef =
## Get the centre descriptor among all other descriptors accessing the same
## backend database (see comments on `reCentre()` for details.)
##
if db.dudes.isNil or db.dudes.rwOk:
db
else:
db.dudes.rwDb
if db.dudes.isNil: db else: db.dudes.centre

proc reCentre*(db: AristoDbRef): Result[void,AristoError] =
proc reCentre*(db: AristoDbRef) =
## Re-focus the `db` argument descriptor so that it becomes the centre.
## Nothing is done if the `db` descriptor is the centre, already.
##
Expand All @@ -186,30 +170,12 @@ proc reCentre*(db: AristoDbRef): Result[void,AristoError] =
## accessing the same backend database. Descriptors where `isCentre()`
## returns `false` must be single destructed with `forget()`.
##
if not db.isCentre:
let parent = db.dudes.rwDb

# Steal dudes list from parent, make the rw-parent a read-only dude
db.dudes = parent.dudes
parent.dudes = DudesRef(rwOk: false, rwDb: db)

# Exclude self
db.dudes.roDudes.excl db

# Update dudes
for w in db.dudes.roDudes:
# Let all other dudes refer to this one
w.dudes.rwDb = db

# Update dudes list (parent was alredy updated)
db.dudes.roDudes.incl parent

ok()

if not db.dudes.isNil:
db.dudes.centre = db

proc fork*(
db: AristoDbRef;
rawTopLayer = false;
noTopLayer = false;
): Result[AristoDbRef,AristoError] =
## This function creates a new empty descriptor accessing the same backend
## (if any) database as the argument `db`. This new descriptor joins the
Expand All @@ -220,44 +186,45 @@ proc fork*(
## also cost computing ressources for maintaining and updating backend
## filters when writing to the backend database .
##
## If the argument `rawTopLayer` is set `true` the function will provide an
## uninitalised and inconsistent (!) top layer. This setting avoids some
## database lookup for cases where the top layer is redefined anyway.
## If the argument `noTopLayer` is set `true` the function will provide an
## uninitalised and inconsistent (!) descriptor object without top layer.
## This setting avoids some database lookup for cases where the top layer
## is redefined anyway.
##
# Make sure that there is a dudes list
if db.dudes.isNil:
db.dudes = DudesRef(centre: db, peers: @[db].toHashSet)

let clone = AristoDbRef(
top: LayerRef.init(),
dudes: db.dudes,
backend: db.backend)

if not rawTopLayer:
if not noTopLayer:
clone.top = LayerRef.init()
let rc = clone.backend.getIdgFn()
if rc.isOk:
clone.top.final.vGen = rc.value
elif rc.error != GetIdgNotFound:
return err(rc.error)

# Update dudes list
if db.dudes.isNil:
clone.dudes = DudesRef(rwOk: false, rwDb: db)
db.dudes = DudesRef(rwOk: true, roDudes: [clone].toHashSet)
else:
let parent = if db.dudes.rwOk: db else: db.dudes.rwDb
clone.dudes = DudesRef(rwOk: false, rwDb: parent)
parent.dudes.roDudes.incl clone
# Add to peer list of clones
db.dudes.peers.incl clone

ok clone

iterator forked*(db: AristoDbRef): AristoDbRef =
## Interate over all non centre descriptors (see comments on `reCentre()`
## for details.)
if not db.dudes.isNil:
for dude in db.getCentre.dudes.roDudes.items:
yield dude
for dude in db.getCentre.dudes.peers.items:
if dude != db.dudes.centre:
yield dude

func nForked*(db: AristoDbRef): int =
## Returns the number of non centre descriptors (see comments on `reCentre()`
## for details.) This function is a fast version of `db.forked.toSeq.len`.
if not db.dudes.isNil:
return db.getCentre.dudes.roDudes.len
return db.dudes.peers.len - 1


proc forget*(db: AristoDbRef): Result[void,AristoError] =
Expand All @@ -267,31 +234,21 @@ proc forget*(db: AristoDbRef): Result[void,AristoError] =
## A non centre descriptor should always be destructed after use (see also
## comments on `fork()`.)
##
if not db.isNil:
if db.isCentre:
return err(NotAllowedOnCentre)

# Unlink argument `db`
let parent = db.dudes.rwDb
if parent.dudes.roDudes.len < 2:
parent.dudes = DudesRef(nil)
else:
parent.dudes.roDudes.excl db

# Clear descriptor so it would not do harm if used wrongly
db[] = AristoDbObj(top: LayerRef.init())
ok()
if db.isCentre:
err(NotAllowedOnCentre)
elif db notin db.dudes.peers:
err(StaleDescriptor)
else:
db.dudes.peers.excl db # Unlink argument `db` from peers list
ok()

proc forgetOthers*(db: AristoDbRef): Result[void,AristoError] =
## For the centre argument `db` descriptor (see comments on `reCentre()`
## for details), destruct all other descriptors accessing the same backend.
##
if not db.isCentre:
return err(MustBeOnCentre)

if not db.dudes.isNil:
for dude in db.dudes.roDudes.items:
dude[] = AristoDbObj(top: LayerRef.init())
if db.dudes.centre != db:
return err(MustBeOnCentre)

db.dudes = DudesRef(nil)
ok()
Expand Down
35 changes: 17 additions & 18 deletions nimbus/db/aristo/aristo_desc/desc_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ type

# -------------

BackendRef* = ref object of RootRef
BackendRef* = ref BackendObj
BackendObj* = object of RootObj
## Backend interface.
filters*: QidSchedRef ## Filter slot queue state

Expand All @@ -120,23 +121,21 @@ type

closeFn*: CloseFn ## Generic destructor

func dup*(be: BackendRef): BackendRef =
if not be.isNil:
result = BackendRef(
filters: be.filters,
getVtxFn: be.getVtxFn,
getKeyFn: be.getKeyFn,
getFilFn: be.getFilFn,
getIdgFn: be.getIdgFn,
getFqsFn: be.getFqsFn,
putBegFn: be.putBegFn,
putVtxFn: be.putVtxFn,
putKeyFn: be.putKeyFn,
putFilFn: be.putFilFn,
putIdgFn: be.putIdgFn,
putFqsFn: be.putFqsFn,
putEndFn: be.putEndFn,
closeFn: be.closeFn)
proc init*(trg: var BackendObj; src: BackendObj) =
trg.filters = src.filters
trg.getVtxFn = src.getVtxFn
trg.getKeyFn = src.getKeyFn
trg.getFilFn = src.getFilFn
trg.getIdgFn = src.getIdgFn
trg.getFqsFn = src.getFqsFn
trg.putBegFn = src.putBegFn
trg.putVtxFn = src.putVtxFn
trg.putKeyFn = src.putKeyFn
trg.putFilFn = src.putFilFn
trg.putIdgFn = src.putIdgFn
trg.putFqsFn = src.putFqsFn
trg.putEndFn = src.putEndFn
trg.closeFn = src.closeFn

# ------------------------------------------------------------------------------
# End
Expand Down
Loading

0 comments on commit 0d73637

Please sign in to comment.