diff --git a/beacon_chain/logtrace.nim b/beacon_chain/logtrace.nim index 508170116d..c6aac7cfb2 100644 --- a/beacon_chain/logtrace.nim +++ b/beacon_chain/logtrace.nim @@ -4,7 +4,8 @@ # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. -import confutils, json, times, streams, os, strutils, options, chronicles +import confutils, json, times, streams, os, strutils, options, chronicles, + tables, sequtils import json_serialization const @@ -22,7 +23,7 @@ const type StartUpCommand* {.pure.} = enum - pubsub, attest + pubsub, asl, asr LogTraceConf* = object logFiles* {. @@ -35,15 +36,31 @@ type name: "sim-dir", defaultValue: "" }: string + netDir* {. + desc: "Specifies path to network build directory", + name: "net-dir", + defaultValue: "" }: string + + nodes* {. + desc: "Specifies node names which logs will be used", + name: "nodes" }: seq[string] + case cmd* {.command.}: StartUpCommand of pubsub: discard - of attest: + of asl: + discard + of asr: discard GossipDirection* = enum None, Incoming, Outgoing + NodeDirectory* = object + name*: string + path*: string + logs*: seq[string] + LogMessage* = object of RootObj level* {.serializedFieldName: "lvl" .}: string timestamp* {.serializedFieldName: "ts" .}: DateTime @@ -82,15 +99,24 @@ type indexInCommittee*: uint64 validator*: string + AttestationReceivedMessage* = object of LogMessage + attestation*: AttestationObject + head*: string + wallSlot*: uint64 + pcs*: string + GossipMessage* = object kind*: GossipDirection id*: string datetime*: DateTime processed*: bool - SaMessageType* = enum + SaMessageType* {.pure.} = enum AttestationSent, SlotStart + SRMessageType* {.pure.} = enum + AttestationSent, AttestationReceived + SlotAttMessage* = object case kind*: SaMessageType of SaMessageType.AttestationSent: @@ -98,6 +124,18 @@ type of SaMessageType.SlotStart: ssmsg*: SlotStartMessage + SRAttMessage* = object + case kind*: SRMessageType + of SRMessageType.AttestationSent: + asmsg*: AttestationSentMessage + of SRMessageType.AttestationReceived: + armsg*: AttestationReceivedMessage + + SRANode* = object + directory*: NodeDirectory + sends*: seq[AttestationSentMessage] + recvs*: TableRef[string, AttestationReceivedMessage] + proc readValue*(reader: var JsonReader, value: var DateTime) = let s = reader.readValue(string) try: @@ -151,8 +189,9 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] = res.add(m) inc(counter) if counter mod 10_000 == 0: - info "Processing file", file = file, lines_processed = counter, - lines_filtered = len(res) + info "Processing file", file = extractFilename(file), + lines_processed = counter, + lines_filtered = len(res) result = res except SerializationError as exc: @@ -163,6 +202,37 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] = finally: stream.close() +proc readLogFileForASRMessages(file: string, + srnode: var SRANode) = + var stream = newFileStream(file) + var line: string + var counter = 0 + try: + while not(stream.atEnd()): + line = stream.readLine() + let m = Json.decode(line, LogMessage, forwardCompatible = true) + if m.msg == "Attestation sent": + let sm = Json.decode(line, AttestationSentMessage, + forwardCompatible = true) + srnode.sends.add(sm) + elif m.msg == "Attestation received": + let rm = Json.decode(line, AttestationReceivedMessage, + forwardCompatible = true) + discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm) + inc(counter) + if counter mod 10_000 == 0: + info "Processing file", file = extractFilename(file), + lines_processed = counter, + sends_filtered = len(srnode.sends), + recvs_filtered = len(srnode.recvs) + except SerializationError as exc: + error "Serialization error while reading data from file", file = file, + errorMsg = exc.formatMsg(line) + except CatchableError as exc: + warn "Error reading data from file", file = file, errorMsg = exc.msg + finally: + stream.close() + proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] = # Because of times.DateTime object we forced to turn off [ProveInit] warnings # You can remove this pragmas when Nim compiler or times.nim will be fixed. @@ -201,6 +271,27 @@ iterator simDirectoryLogFiles*(simdir: string): string = break inc(index) +proc getDirectoryLogFiles*(builddir: string, + filter: seq[string]): seq[NodeDirectory] = + var res = newSeq[NodeDirectory]() + let absPath = absolutePath(builddir) + let dataPath = absPath & DirSep & "data" + if not dirExists(dataPath): + error "Invalid `network` data directory structure", + path = dataPath + quit(1) + + for dirPath in walkDirs(dataPath & DirSep & "*"): + let name = extractFilename(dirPath) + if (len(filter) == 0) or (name in filter): + var nodeDir = NodeDirectory(name: extractFilename(dirPath), + path: dirPath) + for filePath in walkFiles(dirPath & DirSep & "*.log"): + nodeDir.logs.add(extractFilename(filePath)) + if len(nodeDir.logs) > 0: + res.add(nodeDir) + return res + proc getMessage(logs: seq[GossipMessage], msg: GossipMessage): Option[GossipMessage] = {.push warning[ProveInit]: off.} @@ -291,8 +382,59 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) = slot_messages = slotMessagesCount, late_attestation_messages = lateAttsMessagesCount +proc toSimple*(s: seq[string]): string = + result = "[" & s.mapIt("'" & it & "'").join(", ") & "]" + +proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = + info "Check for attestations send/receive messages" + if len(nodes) < 2: + error "Number of nodes' log files are not enough", nodes_count = len(nodes) + quit(1) + var srnodes = newSeq[SRANode]() + + for node in nodes: + var srnode = SRANode( + directory: node, + sends: newSeq[AttestationSentMessage](), + recvs: newTable[string, AttestationReceivedMessage]() + ) + info "Processing node", node = node.name + for logfile in node.logs: + let path = node.path & DirSep & logfile + info "Processing node's logfile", node = node.name, logfile = path + readLogFileForASRMessages(path, srnode) + srnodes.add(srnode) + + if len(nodes) < 2: + error "Number of nodes' log files are not enough", nodes_count = len(nodes) + quit(1) + + for i in 0 ..< len(srnodes): + var success = 0 + var failed = 0 + for item in srnodes[i].sends: + var k = (i + 1) mod len(srnodes) + var misses = newSeq[string]() + while k != i: + if item.attestation.signature notin srnodes[k].recvs: + misses.add(srnodes[k].directory.name) + k = (k + 1) mod len(srnodes) + + if len(misses) == 0: + inc(success) + else: + inc(failed) + info "Attestation was not received", sender = srnodes[i].directory.name, + signature = item.attestation.signature, + receivers = misses.toSimple(), send_stamp = item.timestamp + + info "Statistics for sender node", sender = srnodes[i].directory.name, + sucessfull_broadcasts = success, failed_broadcasts = failed, + total_broadcasts = len(srnodes[i].sends) + proc run(conf: LogTraceConf) = var logFiles: seq[string] + var logNodes: seq[NodeDirectory] if len(conf.logFiles) > 0: for item in conf.logFiles: @@ -303,15 +445,21 @@ proc run(conf: LogTraceConf) = if len(conf.simDir) > 0: for item in simDirectoryLogFiles(conf.simDir): logFiles.add(item) + logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes) + + if len(conf.netDir) > 0: + logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes) - if len(logFiles) == 0: + if len(logFiles) == 0 and len(logNodes) == 0: error "Log file sources not specified or not enough log files found" quit(1) if conf.cmd == StartUpCommand.pubsub: runPubsub(conf, logFiles) - elif conf.cmd == StartUpCommand.attest: + elif conf.cmd == StartUpCommand.asl: runAttSend(conf, logFiles) + elif conf.cmd == StartUpCommand.asr: + runAttSendReceive(conf, logNodes) when isMainModule: echo LogTraceHeader