Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core db simplify new api storage modes #2075

Merged
merged 8 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions nimbus/core/clique/snapshot/snapshot_desc.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
Expand Down Expand Up @@ -69,8 +69,7 @@ proc append[K,V](rw: var RlpWriter; tab: Table[K,V]) =
for key,value in tab.pairs:
rw.append((key,value))

proc read[K,V](rlp: var Rlp;
Q: type Table[K,V]): Q {.gcsafe, raises: [CatchableError].} =
proc read[K,V](rlp: var Rlp; Q: type Table[K,V]): Q {.gcsafe, raises: [RlpError].} =
for w in rlp.items:
let (key,value) = w.read((K,V))
result[key] = value
Expand Down Expand Up @@ -145,36 +144,35 @@ proc `blockHash=`*(s: Snapshot; hash: Hash256) =
proc loadSnapshot*(cfg: CliqueCfg; hash: Hash256):
Result[Snapshot,CliqueError] =
## Load an existing snapshot from the database.
var s = Snapshot(cfg: cfg)
var
s = Snapshot(cfg: cfg)
try:
let rc = s.cfg.db.newKvt(Shared).get(hash.cliqueSnapshotKey.toOpenArray)
let rc = s.cfg.db.newKvt().get(hash.cliqueSnapshotKey.toOpenArray)
if rc.isOk:
s.data = rc.value.decode(SnapshotData)
else:
if rc.error.error != KvtNotFound:
error logTxt "get() failed", error=($$rc.error)
return err((errSnapshotLoad,""))
except CatchableError as e:
except RlpError as e:
return err((errSnapshotLoad, $e.name & ": " & e.msg))
ok(s)

# clique/snapshot.go(104): func (s *Snapshot) store(db [..]
proc storeSnapshot*(cfg: CliqueCfg; s: Snapshot): CliqueOkResult =
## Insert the snapshot into the database.
try:
let
key = s.data.blockHash.cliqueSnapshotKey
val = rlp.encode(s.data)
db = s.cfg.db.newKvt(Companion)
db.put(key.toOpenArray, val).isOkOr:
error logTxt "put() failed", `error`=($$error)
db.persistent()

cfg.nSnaps.inc
cfg.snapsData += val.len.uint
except CatchableError as e:
return err((errSnapshotStore, $e.name & ": " & e.msg))

let
key = s.data.blockHash.cliqueSnapshotKey
val = rlp.encode(s.data)
db = s.cfg.db.newKvt(sharedTable = false) # bypass block chain txs
defer: db.forget()
let rc = db.put(key.toOpenArray, val)
if rc.isErr:
error logTxt "put() failed", `error`=($$rc.error)
db.persistent()

cfg.nSnaps.inc
cfg.snapsData += val.len.uint
ok()

# ------------------------------------------------------------------------------
Expand Down
37 changes: 28 additions & 9 deletions nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import
std/times,
eth/[common, trie/nibbles],
results,
"."/[aristo_delete, aristo_desc, aristo_desc/desc_backend, aristo_fetch,
aristo_get, aristo_hashify, aristo_hike, aristo_init, aristo_merge,
aristo_path, aristo_profile, aristo_serialise, aristo_tx, aristo_vid]
./aristo_desc/desc_backend,
./aristo_init/memory_db,
"."/[aristo_delete, aristo_desc, aristo_fetch, aristo_get, aristo_hashify,
aristo_hike, aristo_init, aristo_merge, aristo_path, aristo_profile,
aristo_serialise, aristo_tx, aristo_vid]

export
AristoDbProfListRef
Expand All @@ -27,6 +29,13 @@ const
AutoValidateApiHooks = defined(release).not
## No validatinon needed for production suite.

AristoPersistentBackendOk = false
## Set true for persistent backend profiling (which needs an extra
## link library.)

when AristoPersistentBackendOk:
import ./aristo_init/rocks_db

# Annotation helper(s)
{.pragma: noRaise, gcsafe, raises: [].}

Expand Down Expand Up @@ -451,11 +460,21 @@ when AutoValidateApiHooks:
doAssert not api.vidFetch.isNil
doAssert not api.vidDispose.isNil

proc validate(prf: AristoApiProfRef; be: BackendRef) =
proc validate(prf: AristoApiProfRef) =
prf.AristoApiRef.validate
doAssert not prf.data.isNil
if not be.isNil:
doAssert not prf.be.isNil

proc dup(be: BackendRef): BackendRef =
case be.kind:
of BackendMemory:
return MemBackendRef(be).dup

of BackendRocksDB:
when AristoPersistentBackendOk:
return RdbBackendRef(be).dup

of BackendVoid:
discard

# ------------------------------------------------------------------------------
# Public API constuctors
Expand Down Expand Up @@ -688,8 +707,8 @@ func init*(
AristoApiProfVidDisposeFn.profileRunner:
api.vidDispose(a, b)

if not be.isNil:
profApi.be = be.dup
profApi.be = be.dup()
if not profApi.be.isNil:

profApi.be.getVtxFn =
proc(a: VertexID): auto =
Expand All @@ -707,7 +726,7 @@ func init*(
result = be.putEndFn(a)

when AutoValidateApiHooks:
profApi.validate be
profApi.validate

profApi

Expand Down
117 changes: 37 additions & 80 deletions nimbus/db/aristo/aristo_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ export
aristo_constants, desc_error, desc_identifiers, desc_structural

type
AristoDudes* = HashSet[AristoDbRef]
## Descriptor peers sharing the same backend

AristoTxRef* = ref object
## Transaction descriptor
db*: AristoDbRef ## Database descriptor
Expand All @@ -55,15 +52,11 @@ type
errKey*: Blob

DudesRef = ref object
case rwOk: bool
of true:
roDudes: AristoDudes ## Read-only peers
else:
rwDb: AristoDbRef ## Link to writable descriptor

VidVtxPair* = object
vid*: VertexID ## Table lookup vertex ID (if any)
vtx*: VertexRef ## Reference to vertex
## List of peers accessing the same database. This list is layzily
## allocated and might be kept with a single entry, i.e. so that
## `{centre} == peers`.
centre: AristoDbRef ## Link to peer with write permission
peers: HashSet[AristoDbRef] ## List of all peers

AristoDbRef* = ref AristoDbObj
AristoDbObj* = object
Expand Down Expand Up @@ -147,12 +140,6 @@ func hash*(db: AristoDbRef): Hash =
## Table/KeyedQueue/HashSet mixin
cast[pointer](db).hash

func dup*(wp: VidVtxPair): VidVtxPair =
## Safe copy of `wp` argument
VidVtxPair(
vid: wp.vid,
vtx: wp.vtx.dup)

# ------------------------------------------------------------------------------
# Public functions, `dude` related
# ------------------------------------------------------------------------------
Expand All @@ -161,18 +148,15 @@ func isCentre*(db: AristoDbRef): bool =
## This function returns `true` is the argument `db` is the centre (see
## comments on `reCentre()` for details.)
##
db.dudes.isNil or db.dudes.rwOk
db.dudes.isNil or db.dudes.centre == db

func getCentre*(db: AristoDbRef): AristoDbRef =
## Get the centre descriptor among all other descriptors accessing the same
## backend database (see comments on `reCentre()` for details.)
##
if db.dudes.isNil or db.dudes.rwOk:
db
else:
db.dudes.rwDb
if db.dudes.isNil: db else: db.dudes.centre

proc reCentre*(db: AristoDbRef): Result[void,AristoError] =
proc reCentre*(db: AristoDbRef) =
## Re-focus the `db` argument descriptor so that it becomes the centre.
## Nothing is done if the `db` descriptor is the centre, already.
##
Expand All @@ -186,30 +170,12 @@ proc reCentre*(db: AristoDbRef): Result[void,AristoError] =
## accessing the same backend database. Descriptors where `isCentre()`
## returns `false` must be single destructed with `forget()`.
##
if not db.isCentre:
let parent = db.dudes.rwDb

# Steal dudes list from parent, make the rw-parent a read-only dude
db.dudes = parent.dudes
parent.dudes = DudesRef(rwOk: false, rwDb: db)

# Exclude self
db.dudes.roDudes.excl db

# Update dudes
for w in db.dudes.roDudes:
# Let all other dudes refer to this one
w.dudes.rwDb = db

# Update dudes list (parent was alredy updated)
db.dudes.roDudes.incl parent

ok()

if not db.dudes.isNil:
db.dudes.centre = db

proc fork*(
db: AristoDbRef;
rawTopLayer = false;
noTopLayer = false;
): Result[AristoDbRef,AristoError] =
## This function creates a new empty descriptor accessing the same backend
## (if any) database as the argument `db`. This new descriptor joins the
Expand All @@ -220,44 +186,45 @@ proc fork*(
## also cost computing ressources for maintaining and updating backend
## filters when writing to the backend database .
##
## If the argument `rawTopLayer` is set `true` the function will provide an
## uninitalised and inconsistent (!) top layer. This setting avoids some
## database lookup for cases where the top layer is redefined anyway.
## If the argument `noTopLayer` is set `true` the function will provide an
## uninitalised and inconsistent (!) descriptor object without top layer.
## This setting avoids some database lookup for cases where the top layer
## is redefined anyway.
##
# Make sure that there is a dudes list
if db.dudes.isNil:
db.dudes = DudesRef(centre: db, peers: @[db].toHashSet)

let clone = AristoDbRef(
top: LayerRef.init(),
dudes: db.dudes,
backend: db.backend)

if not rawTopLayer:
if not noTopLayer:
clone.top = LayerRef.init()
let rc = clone.backend.getIdgFn()
if rc.isOk:
clone.top.final.vGen = rc.value
elif rc.error != GetIdgNotFound:
return err(rc.error)

# Update dudes list
if db.dudes.isNil:
clone.dudes = DudesRef(rwOk: false, rwDb: db)
db.dudes = DudesRef(rwOk: true, roDudes: [clone].toHashSet)
else:
let parent = if db.dudes.rwOk: db else: db.dudes.rwDb
clone.dudes = DudesRef(rwOk: false, rwDb: parent)
parent.dudes.roDudes.incl clone
# Add to peer list of clones
db.dudes.peers.incl clone

ok clone

iterator forked*(db: AristoDbRef): AristoDbRef =
## Interate over all non centre descriptors (see comments on `reCentre()`
## for details.)
if not db.dudes.isNil:
for dude in db.getCentre.dudes.roDudes.items:
yield dude
for dude in db.getCentre.dudes.peers.items:
if dude != db.dudes.centre:
yield dude

func nForked*(db: AristoDbRef): int =
## Returns the number of non centre descriptors (see comments on `reCentre()`
## for details.) This function is a fast version of `db.forked.toSeq.len`.
if not db.dudes.isNil:
return db.getCentre.dudes.roDudes.len
return db.dudes.peers.len - 1


proc forget*(db: AristoDbRef): Result[void,AristoError] =
Expand All @@ -267,31 +234,21 @@ proc forget*(db: AristoDbRef): Result[void,AristoError] =
## A non centre descriptor should always be destructed after use (see also
## comments on `fork()`.)
##
if not db.isNil:
if db.isCentre:
return err(NotAllowedOnCentre)

# Unlink argument `db`
let parent = db.dudes.rwDb
if parent.dudes.roDudes.len < 2:
parent.dudes = DudesRef(nil)
else:
parent.dudes.roDudes.excl db

# Clear descriptor so it would not do harm if used wrongly
db[] = AristoDbObj(top: LayerRef.init())
ok()
if db.isCentre:
err(NotAllowedOnCentre)
elif db notin db.dudes.peers:
err(StaleDescriptor)
else:
db.dudes.peers.excl db # Unlink argument `db` from peers list
ok()

proc forgetOthers*(db: AristoDbRef): Result[void,AristoError] =
## For the centre argument `db` descriptor (see comments on `reCentre()`
## for details), destruct all other descriptors accessing the same backend.
##
if not db.isCentre:
return err(MustBeOnCentre)

if not db.dudes.isNil:
for dude in db.dudes.roDudes.items:
dude[] = AristoDbObj(top: LayerRef.init())
if db.dudes.centre != db:
return err(MustBeOnCentre)

db.dudes = DudesRef(nil)
ok()
Expand Down
35 changes: 17 additions & 18 deletions nimbus/db/aristo/aristo_desc/desc_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ type

# -------------

BackendRef* = ref object of RootRef
BackendRef* = ref BackendObj
BackendObj* = object of RootObj
## Backend interface.
filters*: QidSchedRef ## Filter slot queue state

Expand All @@ -120,23 +121,21 @@ type

closeFn*: CloseFn ## Generic destructor

func dup*(be: BackendRef): BackendRef =
if not be.isNil:
result = BackendRef(
filters: be.filters,
getVtxFn: be.getVtxFn,
getKeyFn: be.getKeyFn,
getFilFn: be.getFilFn,
getIdgFn: be.getIdgFn,
getFqsFn: be.getFqsFn,
putBegFn: be.putBegFn,
putVtxFn: be.putVtxFn,
putKeyFn: be.putKeyFn,
putFilFn: be.putFilFn,
putIdgFn: be.putIdgFn,
putFqsFn: be.putFqsFn,
putEndFn: be.putEndFn,
closeFn: be.closeFn)
proc init*(trg: var BackendObj; src: BackendObj) =
trg.filters = src.filters
trg.getVtxFn = src.getVtxFn
trg.getKeyFn = src.getKeyFn
trg.getFilFn = src.getFilFn
trg.getIdgFn = src.getIdgFn
trg.getFqsFn = src.getFqsFn
trg.putBegFn = src.putBegFn
trg.putVtxFn = src.putVtxFn
trg.putKeyFn = src.putKeyFn
trg.putFilFn = src.putFilFn
trg.putIdgFn = src.putIdgFn
trg.putFqsFn = src.putFqsFn
trg.putEndFn = src.putEndFn
trg.closeFn = src.closeFn

# ------------------------------------------------------------------------------
# End
Expand Down
Loading