Skip to content

Commit

Permalink
Revert "Implement log tracing of attestation send and receive message…
Browse files Browse the repository at this point in the history
…s. (#1361)" (#1389)

This reverts commit ac12af1.
  • Loading branch information
tersec authored Jul 28, 2020
1 parent a07dab3 commit 1220cc0
Showing 1 changed file with 8 additions and 156 deletions.
164 changes: 8 additions & 156 deletions beacon_chain/logtrace.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles,
tables, sequtils
import confutils, json, times, streams, os, strutils, options, chronicles
import json_serialization

const
Expand All @@ -23,7 +22,7 @@ const

type
StartUpCommand* {.pure.} = enum
pubsub, asl, asr
pubsub, attest

LogTraceConf* = object
logFiles* {.
Expand All @@ -36,31 +35,15 @@ type
name: "sim-dir",
defaultValue: "" }: string

netDir* {.
desc: "Specifies path to network build directory",
name: "net-dir",
defaultValue: "" }: string

nodes* {.
desc: "Specifies node names which logs will be used",
name: "nodes" }: seq[string]

case cmd* {.command.}: StartUpCommand
of pubsub:
discard
of asl:
discard
of asr:
of attest:
discard

GossipDirection* = enum
None, Incoming, Outgoing

NodeDirectory* = object
name*: string
path*: string
logs*: seq[string]

LogMessage* = object of RootObj
level* {.serializedFieldName: "lvl" .}: string
timestamp* {.serializedFieldName: "ts" .}: DateTime
Expand Down Expand Up @@ -99,43 +82,22 @@ type
indexInCommittee*: uint64
validator*: string

AttestationReceivedMessage* = object of LogMessage
attestation*: AttestationObject
head*: string
wallSlot*: uint64
pcs*: string

GossipMessage* = object
kind*: GossipDirection
id*: string
datetime*: DateTime
processed*: bool

SaMessageType* {.pure.} = enum
SaMessageType* = enum
AttestationSent, SlotStart

SRMessageType* {.pure.} = enum
AttestationSent, AttestationReceived

SlotAttMessage* = object
case kind*: SaMessageType
of SaMessageType.AttestationSent:
asmsg*: AttestationSentMessage
of SaMessageType.SlotStart:
ssmsg*: SlotStartMessage

SRAttMessage* = object
case kind*: SRMessageType
of SRMessageType.AttestationSent:
asmsg*: AttestationSentMessage
of SRMessageType.AttestationReceived:
armsg*: AttestationReceivedMessage

SRANode* = object
directory*: NodeDirectory
sends*: seq[AttestationSentMessage]
recvs*: TableRef[string, AttestationReceivedMessage]

proc readValue*(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
Expand Down Expand Up @@ -189,9 +151,8 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
res.add(m)
inc(counter)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
info "Processing file", file = file, lines_processed = counter,
lines_filtered = len(res)
result = res

except SerializationError as exc:
Expand All @@ -202,37 +163,6 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
finally:
stream.close()

proc readLogFileForASRMessages(file: string,
srnode: var SRANode) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
let m = Json.decode(line, LogMessage, forwardCompatible = true)
if m.msg == "Attestation sent":
let sm = Json.decode(line, AttestationSentMessage,
forwardCompatible = true)
srnode.sends.add(sm)
elif m.msg == "Attestation received":
let rm = Json.decode(line, AttestationReceivedMessage,
forwardCompatible = true)
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
inc(counter)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except SerializationError as exc:
error "Serialization error while reading data from file", file = file,
errorMsg = exc.formatMsg(line)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()

proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
Expand Down Expand Up @@ -271,27 +201,6 @@ iterator simDirectoryLogFiles*(simdir: string): string =
break
inc(index)

proc getDirectoryLogFiles*(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let absPath = absolutePath(builddir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `network` data directory structure",
path = dataPath
quit(1)

for dirPath in walkDirs(dataPath & DirSep & "*"):
let name = extractFilename(dirPath)
if (len(filter) == 0) or (name in filter):
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
path: dirPath)
for filePath in walkFiles(dirPath & DirSep & "*.log"):
nodeDir.logs.add(extractFilename(filePath))
if len(nodeDir.logs) > 0:
res.add(nodeDir)
return res

proc getMessage(logs: seq[GossipMessage],
msg: GossipMessage): Option[GossipMessage] =
{.push warning[ProveInit]: off.}
Expand Down Expand Up @@ -382,59 +291,8 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
slot_messages = slotMessagesCount,
late_attestation_messages = lateAttsMessagesCount

proc toSimple*(s: seq[string]): string =
result = "[" & s.mapIt("'" & it & "'").join(", ") & "]"

proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for attestations send/receive messages"
if len(nodes) < 2:
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()

for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode)
srnodes.add(srnode)

if len(nodes) < 2:
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
quit(1)

for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.attestation.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)

if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Attestation was not received", sender = srnodes[i].directory.name,
signature = item.attestation.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp

info "Statistics for sender node", sender = srnodes[i].directory.name,
sucessfull_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)

proc run(conf: LogTraceConf) =
var logFiles: seq[string]
var logNodes: seq[NodeDirectory]

if len(conf.logFiles) > 0:
for item in conf.logFiles:
Expand All @@ -445,21 +303,15 @@ proc run(conf: LogTraceConf) =
if len(conf.simDir) > 0:
for item in simDirectoryLogFiles(conf.simDir):
logFiles.add(item)
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)

if len(conf.netDir) > 0:
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)

if len(logFiles) == 0 and len(logNodes) == 0:
if len(logFiles) == 0:
error "Log file sources not specified or not enough log files found"
quit(1)

if conf.cmd == StartUpCommand.pubsub:
runPubsub(conf, logFiles)
elif conf.cmd == StartUpCommand.asl:
elif conf.cmd == StartUpCommand.attest:
runAttSend(conf, logFiles)
elif conf.cmd == StartUpCommand.asr:
runAttSendReceive(conf, logNodes)

when isMainModule:
echo LogTraceHeader
Expand Down

0 comments on commit 1220cc0

Please sign in to comment.