Skip to content

Commit

Permalink
first batch of work towards the VC/BN split:
Browse files Browse the repository at this point in the history
- we have a new binary which connects via RPC to the respective BN and has an internal clock - waking it up on every slot
- the BN has a new option called --external-validators and currently in order to have the VC binaries to run we need to pass EXTERNAL_VALIDATORS=yes to make
- factored some code out of beacon_node.nim for easier reuse in validator_api.nim and validator_client.nim
- the VC loads its associated private keys from the datadir for its BN
- most of the validator API calls have been implemented as a stub.
- the VC polls its BN at the start of each epoch - getting a list of all active validators for the current epoch - and then continues to request blocks and sign them with its appropriate validators when necessary
  • Loading branch information
onqtam committed May 25, 2020
1 parent f41d823 commit 8760494
Show file tree
Hide file tree
Showing 19 changed files with 694 additions and 252 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ build/
# ntags/ctags output
/tags

# vscode
.vscode

# Ignore dynamic, static libs and libtool archive files
*.so
*.dylib
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ BUILD_SYSTEM_DIR := vendor/nimbus-build-system

# unconditionally built by the default Make target
TOOLS := \
validator_client \
beacon_node \
inspector \
logtrace \
Expand Down
182 changes: 10 additions & 172 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
# Nimble packages
stew/[objects, bitseqs, byteutils], stew/shims/macros,
chronos, confutils, metrics, json_rpc/[rpcserver, jsonmarshal],
chronicles, chronicles/helpers as chroniclesHelpers,
chronicles,
json_serialization/std/[options, sets, net], serialization/errors,
eth/db/kvstore, eth/db/kvstore_sqlite3,
eth/p2p/enode, eth/[keys, async_utils], eth/p2p/discoveryv5/[protocol, enr],
Expand All @@ -22,14 +22,14 @@ import
spec/presets/custom,
conf, time, beacon_chain_db, validator_pool, extras,
attestation_pool, block_pool, eth2_network, eth2_discovery,
beacon_node_common, beacon_node_types, sszdump,
beacon_node_common, beacon_node_types,
nimbus_binary_common,
mainchain_monitor, version, ssz, ssz/dynamic_navigator,
sync_protocol, request_manager, validator_keygen, interop, statusbar,
sync_manager, state_transition,
validator_duties
validator_duties, validator_api

const
genesisFile = "genesis.ssz"
hasPrompt = not defined(withoutPrompt)

type
Expand All @@ -55,68 +55,12 @@ declareGauge beacon_head_slot,
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
"Number of beacon chain attestations received by this peer"
declareCounter beacon_blocks_received,
"Number of beacon chain blocks received by this peer"

declareHistogram beacon_attestation_received_seconds_from_slot_start,
"Interval between slot start and attestation receival", buckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]

logScope: topics = "beacnde"

proc onBeaconBlock*(node: BeaconNode, signedBlock: SignedBeaconBlock) {.gcsafe.}

proc getStateFromSnapshot(conf: BeaconNodeConf): NilableBeaconStateRef =
var
genesisPath = conf.dataDir/genesisFile
snapshotContents: TaintedString
writeGenesisFile = false

if conf.stateSnapshot.isSome:
let
snapshotPath = conf.stateSnapshot.get.string
snapshotExt = splitFile(snapshotPath).ext

if cmpIgnoreCase(snapshotExt, ".ssz") != 0:
error "The supplied state snapshot must be a SSZ file",
suppliedPath = snapshotPath
quit 1

snapshotContents = readFile(snapshotPath)
if fileExists(genesisPath):
let genesisContents = readFile(genesisPath)
if snapshotContents != genesisContents:
error "Data directory not empty. Existing genesis state differs from supplied snapshot",
dataDir = conf.dataDir.string, snapshot = snapshotPath
quit 1
else:
debug "No previous genesis state. Importing snapshot",
genesisPath, dataDir = conf.dataDir.string
writeGenesisFile = true
genesisPath = snapshotPath
else:
try:
snapshotContents = readFile(genesisPath)
except CatchableError as err:
error "Failed to read genesis file", err = err.msg
quit 1

result = try:
newClone(SSZ.decode(snapshotContents, BeaconState))
except SerializationError:
error "Failed to import genesis file", path = genesisPath
quit 1

info "Loaded genesis state", path = genesisPath

if writeGenesisFile:
try:
notice "Writing genesis to data directory", path = conf.dataDir/genesisFile
writeFile(conf.dataDir/genesisFile, snapshotContents.string)
except CatchableError as err:
error "Failed to persist genesis file to data dir",
err = err.msg, genesisFile = conf.dataDir/genesisFile
quit 1

proc enrForkIdFromState(state: BeaconState): ENRForkID =
let
forkVer = state.fork.current_version
Expand Down Expand Up @@ -256,72 +200,6 @@ proc connectToNetwork(node: BeaconNode) {.async.} =
let addressFile = node.config.dataDir / "beacon_node.address"
writeFile(addressFile, node.network.announcedENR.toURI)


proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain
# we're on, or that it follows the rules of the protocol
logScope: pcs = "on_attestation"

let
wallSlot = node.beaconClock.now().toSlot()
head = node.blockPool.head

debug "Attestation received",
attestation = shortLog(attestation),
headRoot = shortLog(head.blck.root),
headSlot = shortLog(head.blck.slot),
wallSlot = shortLog(wallSlot.slot),
cat = "consensus" # Tag "consensus|attestation"?

if not wallSlot.afterGenesis or wallSlot.slot < head.blck.slot:
warn "Received attestation before genesis or head - clock is wrong?",
afterGenesis = wallSlot.afterGenesis,
wallSlot = shortLog(wallSlot.slot),
headSlot = shortLog(head.blck.slot),
cat = "clock_drift" # Tag "attestation|clock_drift"?
return

if attestation.data.slot > head.blck.slot and
(attestation.data.slot - head.blck.slot) > MaxEmptySlotCount:
warn "Ignoring attestation, head block too old (out of sync?)",
attestationSlot = attestation.data.slot, headSlot = head.blck.slot
return

node.attestationPool.add(attestation)

proc storeBlock(
node: BeaconNode, signedBlock: SignedBeaconBlock): Result[void, BlockError] =
let blockRoot = hash_tree_root(signedBlock.message)
debug "Block received",
signedBlock = shortLog(signedBlock.message),
blockRoot = shortLog(blockRoot),
cat = "block_listener",
pcs = "receive_block"

if node.config.dumpEnabled:
dump(node.config.dumpDir / "incoming", signedBlock, blockRoot)

beacon_blocks_received.inc()
discard ? node.blockPool.add(blockRoot, signedBlock)

# The block we received contains attestations, and we might not yet know about
# all of them. Let's add them to the attestation pool - in case they block
# is not yet resolved, neither will the attestations be!
# But please note that we only care about recent attestations.
# TODO shouldn't add attestations if the block turns out to be invalid..
let currentSlot = node.beaconClock.now.toSlot
if currentSlot.afterGenesis and
signedBlock.message.slot.epoch + 1 >= currentSlot.slot.epoch:
for attestation in signedBlock.message.body.attestations:
node.onAttestation(attestation)
ok()

proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
# don't know if it's part of the chain we're currently building.
discard node.storeBlock(signedBlock)

func verifyFinalization(node: BeaconNode, slot: Slot) =
# Epoch must be >= 4 to check finalization
const SETTLING_TIME_OFFSET = 1'u64
Expand Down Expand Up @@ -567,16 +445,6 @@ proc runSyncLoop(node: BeaconNode) {.async.} =

await syncman.sync()

# TODO: Should we move these to other modules?
# This would require moving around other type definitions
proc installValidatorApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
discard

func slotOrZero(time: BeaconTime): Slot =
let exSlot = time.toSlot
if exSlot.afterGenesis: exSlot.slot
else: Slot(0)

proc currentSlot(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero

Expand Down Expand Up @@ -683,7 +551,9 @@ proc installDebugApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
return res

proc installRpcHandlers(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.installValidatorApiHandlers(node)
# TODO: remove this if statement later - here just to test the config option for now
if node.config.externalValidators:
rpcServer.installValidatorApiHandlers(node)
rpcServer.installBeaconApiHandlers(node)
rpcServer.installDebugApiHandlers(node)

Expand Down Expand Up @@ -754,9 +624,7 @@ proc run*(node: BeaconNode) =
installAttestationHandlers(node)

let
t = node.beaconClock.now().toSlot()
curSlot = if t.afterGenesis: t.slot
else: GENESIS_SLOT
curSlot = node.beaconClock.now().slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
fromNow = saturate(node.beaconClock.fromNow(nextSlot))

Expand Down Expand Up @@ -954,30 +822,7 @@ programMain:
banner = clientId & "\p" & copyrights & "\p\p" & nimBanner
config = BeaconNodeConf.load(version = banner, copyrightBanner = banner)

when compiles(defaultChroniclesStream.output.writer):
defaultChroniclesStream.output.writer =
proc (logLevel: LogLevel, msg: LogOutputStr) {.gcsafe, raises: [Defect].} =
try:
stdout.write(msg)
except IOError as err:
logLoggingFailure(cstring(msg), err)

randomize()

try:
let directives = config.logLevel.split(";")
try:
setLogLevel(parseEnum[LogLevel](directives[0]))
except ValueError:
raise (ref ValueError)(msg: "Please specify one of TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL")

if directives.len > 1:
for topicName, settings in parseTopicDirectives(directives[1..^1]):
if not setTopicState(topicName, settings.state, settings.logLevel):
warn "Unrecognized logging topic", topic = topicName
except ValueError as err:
stderr.write "Invalid value for --log-level. " & err.msg
quit 1
setupMainProc(config.logLevel)

case config.cmd
of createTestnet:
Expand Down Expand Up @@ -1063,14 +908,7 @@ programMain:

var node = waitFor BeaconNode.init(config)

## Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
info "Shutting down after having received SIGINT"
status = BeaconNodeStatus.Stopping
setControlCHook(controlCHandler)
ctrlCHandling: status = BeaconNodeStatus.Stopping

when hasPrompt:
initPrompt(node)
Expand Down
84 changes: 81 additions & 3 deletions beacon_chain/beacon_node_common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@
# Common routines for a BeaconNode and a BeaconValidator node

import
# Standard library
os, tables,

# Nimble packages
chronos, json_rpc/rpcserver, metrics,
chronicles,

# Local modules
spec/[datatypes, crypto],
conf, time, beacon_chain_db,
spec/[datatypes, crypto, helpers],
conf, time, beacon_chain_db, sszdump,
attestation_pool, block_pool, eth2_network,
beacon_node_types, mainchain_monitor, request_manager

# TODO figure out how to silence the `unused pragma` warning for specific builds of this
# https://discordapp.com/channels/613988663034118151/614014714590134292/713053239297179668
import spec/digest

type
RpcServer* = RpcHttpServer

Expand All @@ -38,12 +46,82 @@ type
topicAggregateAndProofs*: string
syncLoop*: Future[void]

const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

# Metrics
declareGauge beacon_head_root,
"Root of the head block of the beacon chain"

# Metrics for tracking attestation and beacon block loss
declareCounter beacon_blocks_received,
"Number of beacon chain blocks received by this peer"

proc onAttestation*(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain
# we're on, or that it follows the rules of the protocol
logScope: pcs = "on_attestation"

let
wallSlot = node.beaconClock.now().toSlot()
head = node.blockPool.head

debug "Attestation received",
attestation = shortLog(attestation),
headRoot = shortLog(head.blck.root),
headSlot = shortLog(head.blck.slot),
wallSlot = shortLog(wallSlot.slot),
cat = "consensus" # Tag "consensus|attestation"?

if not wallSlot.afterGenesis or wallSlot.slot < head.blck.slot:
warn "Received attestation before genesis or head - clock is wrong?",
afterGenesis = wallSlot.afterGenesis,
wallSlot = shortLog(wallSlot.slot),
headSlot = shortLog(head.blck.slot),
cat = "clock_drift" # Tag "attestation|clock_drift"?
return

if attestation.data.slot > head.blck.slot and
(attestation.data.slot - head.blck.slot) > MaxEmptySlotCount:
warn "Ignoring attestation, head block too old (out of sync?)",
attestationSlot = attestation.data.slot, headSlot = head.blck.slot
return

node.attestationPool.add(attestation)

proc storeBlock*(
node: BeaconNode, signedBlock: SignedBeaconBlock): Result[void, BlockError] =
let blockRoot = hash_tree_root(signedBlock.message)
debug "Block received",
signedBlock = shortLog(signedBlock.message),
blockRoot = shortLog(blockRoot),
cat = "block_listener",
pcs = "receive_block"

if node.config.dumpEnabled:
dump(node.config.dumpDir / "incoming", signedBlock, blockRoot)

beacon_blocks_received.inc()
discard ? node.blockPool.add(blockRoot, signedBlock)

# The block we received contains attestations, and we might not yet know about
# all of them. Let's add them to the attestation pool - in case they block
# is not yet resolved, neither will the attestations be!
# But please note that we only care about recent attestations.
# TODO shouldn't add attestations if the block turns out to be invalid..
let currentSlot = node.beaconClock.now.toSlot
if currentSlot.afterGenesis and
signedBlock.message.slot.epoch + 1 >= currentSlot.slot.epoch:
for attestation in signedBlock.message.body.attestations:
node.onAttestation(attestation)
ok()

proc onBeaconBlock*(node: BeaconNode, signedBlock: SignedBeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
# don't know if it's part of the chain we're currently building.
discard node.storeBlock(signedBlock)

proc updateHead*(node: BeaconNode): BlockRef =
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve()
Expand Down
Loading

0 comments on commit 8760494

Please sign in to comment.