From cec52f960deb98e4a810d0386a36645f26da0e19 Mon Sep 17 00:00:00 2001 From: ia Date: Wed, 6 Dec 2017 09:02:33 -0600 Subject: [PATCH] problem: implement display logging cmd to log status, events solution: able to provide rolling status logs at glog.D(logger.Warn) verbosity, and event logs for blockchain, headerchain, mining, and downloading --- cmd/geth/cmd.go | 409 +++++++++++++++++++++++++---------- core/blockchain.go | 22 +- core/blockchain_test.go | 8 +- core/events.go | 25 ++- core/headerchain.go | 28 ++- eth/downloader/api.go | 19 +- eth/downloader/downloader.go | 7 + 7 files changed, 398 insertions(+), 120 deletions(-) diff --git a/cmd/geth/cmd.go b/cmd/geth/cmd.go index 613ec7f04..48fb28607 100644 --- a/cmd/geth/cmd.go +++ b/cmd/geth/cmd.go @@ -20,6 +20,18 @@ import ( "bufio" "errors" "fmt" + "io" + "io/ioutil" + "math/big" + "os" + "os/signal" + "path/filepath" + "runtime" + "strconv" + "strings" + "syscall" + "time" + "github.com/ethereumproject/ethash" "github.com/ethereumproject/go-ethereum/core" "github.com/ethereumproject/go-ethereum/core/state" @@ -33,17 +45,6 @@ import ( "github.com/ethereumproject/go-ethereum/pow" "github.com/ethereumproject/go-ethereum/rlp" "gopkg.in/urfave/cli.v1" - "io" - "io/ioutil" - "math/big" - "os" - "os/signal" - "path/filepath" - "runtime" - "strconv" - "strings" - "syscall" - "time" ) const ( @@ -115,6 +116,7 @@ func StartNode(stack *node.Node) { }() } +// ImportChain imports a blockchain. func ImportChain(chain *core.BlockChain, fn string) error { // Watch for Ctrl-C while the import is running. // If a signal is received, the import will stop at the next batch. @@ -756,7 +758,6 @@ type LogStatusFeatAvailability int const ( StatusFeatAvailable LogStatusFeatAvailability = iota StatusFeatRegistered - StatusFeatNonexistent ) // availableLogStatusFeatures stores state of implemented log STATUS features. @@ -765,6 +766,29 @@ var availableLogStatusFeatures = map[string]LogStatusFeatAvailability{ "sync": StatusFeatAvailable, } +type lsMode int + +const ( + lsModeDiscover lsMode = iota + lsModeFullSync + lsModeFastSync + lsModeImport +) + +var lsModeName = []string{ + "Discover", + "FullSync", + "FastSync", + "Import", +} + +var lsModeIcon = []string{ + "", + "︎◉", + "◎", + "▶︎", +} + // dispatchStatusLogs handle parsing --log-status=argument and toggling appropriate goroutine status feature logging. func dispatchStatusLogs(ctx *cli.Context, ethe *eth.Ethereum) { flagName := aliasableName(LogStatusFlag.Name, ctx) @@ -781,13 +805,18 @@ func dispatchStatusLogs(ctx *cli.Context, ethe *eth.Ethereum) { // If possible, split sync=60 into ["sync", "60"], otherwise yields ["sync"], ["60"], or ["someothernonsense"] eqs := strings.Split(p, "=") + if len(eqs) < 2 { + glog.Errorf("Invalid log status value: %v. Must be comma-separated pairs of module=interval.", eqs) + os.Exit(1) + } // Catch unavailable and duplicate status feature logs - if availableLogStatusFeatures[eqs[0]] == StatusFeatNonexistent { - glog.Fatalf("%v: %v: unavailable status feature by name of '%v'", flagName, ErrInvalidFlag, eqs[0]) - } - if availableLogStatusFeatures[eqs[0]] == StatusFeatRegistered { - glog.Fatalf("%v: %v: duplicate status feature by name of '%v'", flagName, ErrInvalidFlag, eqs[0]) + if status, ok := availableLogStatusFeatures[eqs[0]]; !ok { + glog.Errorf("%v: %v: unavailable status feature by name of '%v'", flagName, ErrInvalidFlag, eqs[0]) + os.Exit(1) + } else if status == StatusFeatRegistered { + glog.Errorf("%v: %v: duplicate status feature by name of '%v'", flagName, ErrInvalidFlag, eqs[0]) + os.Exit(1) } // If user just uses "sync" instead of "sync=42", append empty string and delegate to each status log function how to handle it @@ -797,7 +826,7 @@ func dispatchStatusLogs(ctx *cli.Context, ethe *eth.Ethereum) { switch eqs[0] { case "sync": availableLogStatusFeatures["sync"] = StatusFeatRegistered - go runStatusSyncLogs(ethe, eqs[1], ctx.GlobalInt(aliasableName(MaxPeersFlag.Name, ctx))) + go runStatusSyncLogs(ctx, ethe, eqs[1], ctx.GlobalInt(aliasableName(MaxPeersFlag.Name, ctx))) } } } @@ -805,133 +834,293 @@ func dispatchStatusLogs(ctx *cli.Context, ethe *eth.Ethereum) { // runStatusSyncLogs starts STATUS SYNC logging at a given interval. // It should be run as a goroutine. // eg. --log-status="sync=42" logs SYNC information every 42 seconds -func runStatusSyncLogs(e *eth.Ethereum, interval string, maxPeers int) { - - // Establish default interval. +func runStatusSyncLogs(ctx *cli.Context, e *eth.Ethereum, interval string, maxPeers int) { + // Establish default interval and parse desired interval from context. + // Includes convenience notifications for UI/UX. intervalI := 60 - if interval != "" { i, e := strconv.Atoi(interval) if e != nil { - glog.Fatalf("STATUS SYNC %v: could not parse argument: %v", e, interval) + glog.Fatalf("SYNC %v: could not parse argument: %v", e, interval) } if i < 1 { - glog.Fatalf("STATUS SYNC interval value must be a positive integer, got: %d", i) + glog.Fatalf("SYNC interval value must be a positive integer, got: %d", i) } intervalI = i } + glog.V(logger.Info).Infof("Rolling SYNC log interval set: %d seconds", intervalI) + + // Only use severity=warn if --log-status not in use (ie using defaults) + statIntervalNotice := fmt.Sprintf("Rolling SYNC status logs set to every %d seconds. ", intervalI) + if !ctx.GlobalIsSet(LogStatusFlag.Name) { + statIntervalNotice += fmt.Sprintf("You can adjust this with the --%s flag.", LogStatusFlag.Name) + glog.D(logger.Error).Warnln(statIntervalNotice) + // statIntervalNoticeFn = glog.D(logger.Error).Warnf + } else { + glog.D(logger.Error).Infoln(statIntervalNotice) + } - glog.V(logger.Info).Infof("STATUS SYNC Log interval set: %d seconds", intervalI) - + // Set up ticker based on established interval. tickerInterval := time.Second * time.Duration(int32(intervalI)) ticker := time.NewTicker(tickerInterval) + var chainEventLastSent time.Time + // Bookmark vars. var lastLoggedBlockNumber uint64 + + var lsMode = lsModeDiscover // init + var lsModeN int + var lsModeDiscoverSpinners = []string{"➫", "➬", "➭"} + + var dominoes = []string{"🁣", "🁤", "🁥", "🁦", "🁭", "🁴", "🁻", "🁼", "🂃", "🂄", "🂋", "🂌", "🂓"} // 🁣🁤🁥🁦🁭🁴🁻🁼🂃🂄🂋🂌🂓 + chainIcon := "◼⋯⋯" + logger.ColorGreen("◼") + forkIcon := "◼⋯⦦" + logger.ColorGreen("◼") + headerIcon := "◼⋯⋯" + logger.ColorGreen("❐") + downloaderIcon := "◼⋯⋯" + logger.ColorGreen("⬇") + minedIcon := "◼⋯⋯" + logger.ColorGreen("⟠") + var sigc = make(chan os.Signal, 1) signal.Notify(sigc, os.Interrupt, syscall.SIGTERM) defer signal.Stop(sigc) - for { - select { - case <-ticker.C: - lenPeers := e.Downloader().GetPeers().Len() - - _, current, height, _, _ := e.Downloader().Progress() // origin, current, height, pulled, known - mode := e.Downloader().GetMode() - - // Get our head block - blockchain := e.BlockChain() - currentBlockHex := blockchain.CurrentBlock().Hash().Hex() - - // Discover -> not synchronising (searching for peers) - // FullSync/FastSync -> synchronising - // Import -> synchronising, at full height - fMode := "Discover" - fOfHeight := fmt.Sprintf(" of #%7d", height) - - // Calculate and format percent sync of known height - heightRatio := float64(current) / float64(height) - heightRatio = heightRatio * 100 - fHeightRatio := fmt.Sprintf("(%4.2f%%)", heightRatio) - - // Wait until syncing because real dl mode will not be engaged until then - if e.Downloader().Synchronising() { - switch mode { - case downloader.FullSync: - fMode = "FullSync" - case downloader.FastSync: - fMode = "FastSync" - currentBlockHex = blockchain.CurrentFastBlock().Hash().Hex() + // Should listen for events. + // Proof of concept create event subscription + ethEvents := e.EventMux().Subscribe( + // ChainEvent is called when a single block is inserted into the local blockchain. + // core.ChainEvent{}, + // ChainSideEvent is called when a forked block is inserted into the local blockchain. + core.ChainSideEvent{}, + // NewMinedBlockEvent is called when a new block is mined locally. + core.NewMinedBlockEvent{}, + // ChainInsertEvent is called when a batch of block is finished processing through the bc.InsertChain fn. + // It includes statistics. Processed, queued, ignored, txcount, etc. + core.ChainInsertEvent{}, + // HeaderChainInsertEvent is called when headers are inserted into the headerchain, ie. fastsync. + core.HeaderChainInsertEvent{}, + // StartEvent is called when a peer is selected for synchronisation and sync begins. + downloader.StartEvent{}, + // DoneEvent is called when synchronisation with a peer finishes without error. + downloader.DoneEvent{}, + // FailedEvent is called when synchronisation with a peer finishes with an error. + downloader.FailedEvent{}, + ) + greenParenify := func(s string) string { + return logger.ColorGreen("⟪") + s + logger.ColorGreen("⟫") + } + redParenify := func(s string) string { + return logger.ColorRed("⟪") + s + logger.ColorRed("⟫") + } + handleDownloaderEvent := func(e interface{}) { + s := downloaderIcon + " " + switch d := e.(type) { + case downloader.StartEvent: + s += "Start " + greenParenify(fmt.Sprintf("%s", d.Peer)) + " hash=" + greenParenify(d.Hash.Hex()[:9]+"…") + " TD=" + greenParenify(fmt.Sprintf("%v", d.TD)) + glog.D(logger.Info).Infoln(s) + case downloader.DoneEvent: + s += "Done " + greenParenify(fmt.Sprintf("%s", d.Peer)) + " hash=" + greenParenify(d.Hash.Hex()[:9]+"…") + " TD=" + greenParenify(fmt.Sprintf("%v", d.TD)) + glog.D(logger.Info).Infoln(s) + case downloader.FailedEvent: + s += "Fail " + greenParenify(fmt.Sprintf("%s", d.Peer)) + " " + logger.ColorRed("err") + "=" + redParenify(d.Err.Error()) + glog.D(logger.Info).Warnln(s) + } + } + + go func() { + for e := range ethEvents.Chan() { + switch d := e.Data.(type) { + // case core.ChainEvent: + // glog.D(logger.Info).Infof("chainevent time=%v block=%v", e.Time, d.Block.NumberU64()) + case core.ChainInsertEvent: + glog.D(logger.Info).Infof(chainIcon+" Insert "+logger.ColorGreen("blocks")+"=%s "+logger.ColorGreen("◼")+"=%s "+logger.ColorGreen("took")+"=%s", + greenParenify(fmt.Sprintf("processed=%4d queued=%4d ignored=%4d txs=%4d", d.Processed, d.Queued, d.Ignored, d.TxCount)), + greenParenify(fmt.Sprintf("n=%8d hash=%s… time=%v ago", d.LastNumber, d.LastHash.Hex()[:9], time.Since(d.LatestBlockTime).Round(time.Millisecond))), + greenParenify(fmt.Sprintf("%v", d.Elasped.Round(time.Millisecond))), + ) + if bool(glog.D(logger.Info)) { + chainEventLastSent = time.Now() } + case core.ChainSideEvent: + glog.D(logger.Info).Infof(forkIcon+" Insert "+logger.ColorGreen("forked block")+"=%s", greenParenify(fmt.Sprintf("n=%8d hash=%s…", d.Block.NumberU64(), d.Block.Hash().Hex()[:9]))) + case core.HeaderChainInsertEvent: + glog.D(logger.Info).Infof(headerIcon+" Insert "+logger.ColorGreen("headers")+"=%s "+logger.ColorGreen("❐")+"=%s"+logger.ColorGreen("took")+"=%s", + greenParenify(fmt.Sprintf("processed=%4d ignored=%4d", d.Processed, d.Ignored)), + greenParenify(fmt.Sprintf("n=%4d hash=%s…", d.LastNumber, d.LastHash.Hex()[:9])), + greenParenify(fmt.Sprintf("%v", d.Elasped.Round(time.Microsecond))), + ) + if bool(glog.D(logger.Info)) { + chainEventLastSent = time.Now() + } + case core.NewMinedBlockEvent: + glog.D(logger.Info).Infof(minedIcon+" Mined "+logger.ColorGreen("◼")+"="+greenParenify(fmt.Sprintf("n=%8d hash=%s… coinbase=%s… txs=%3d uncles=%d", + d.Block.NumberU64(), + d.Block.Hash().Hex()[:9], + d.Block.Coinbase().Hex()[:9], + len(d.Block.Transactions()), + len(d.Block.Uncles()), + ))) + default: + handleDownloaderEvent(d) } - if current >= height && !(current == 0 && height == 0) { - fMode = "Import " // with spaces to make same length as Discover, FastSync, FullSync - fOfHeight = strings.Repeat(" ", 12) - fHeightRatio = strings.Repeat(" ", 7) - } - if height == 0 { - fOfHeight = strings.Repeat(" ", 12) - fHeightRatio = strings.Repeat(" ", 7) + } + }() + + printIntervalStatusLog := func() { + lenPeers := e.Downloader().GetPeers().Len() + + rtt, ttl, conf := e.Downloader().Qos() + confS := fmt.Sprintf("%01.2f", conf) + qosDisplay := fmt.Sprintf("rtt=%v ttl=%v conf=%s", rtt.Round(time.Millisecond), ttl.Round(time.Millisecond), confS) + + _, current, height, _, _ := e.Downloader().Progress() // origin, current, height, pulled, known + mode := e.Downloader().GetMode() + if mode == downloader.FastSync { + current = e.BlockChain().CurrentFastBlock().NumberU64() + } + + // Get our head block + blockchain := e.BlockChain() + currentBlockHex := blockchain.CurrentBlock().Hash().Hex() + + // Discover -> not synchronising (searching for peers) + // FullSync/FastSync -> synchronising + // Import -> synchronising, at full height + fOfHeight := fmt.Sprintf("%7d", height) + + // Calculate and format percent sync of known height + heightRatio := float64(current) / float64(height) + heightRatio = heightRatio * 100 + fHeightRatio := fmt.Sprintf("%4.2f%%", heightRatio) + + // Wait until syncing because real dl mode will not be engaged until then + lsMode = lsModeDiscover + if e.Downloader().Synchronising() { + switch mode { + case downloader.FullSync: + lsMode = lsModeFullSync + case downloader.FastSync: + lsMode = lsModeFastSync + currentBlockHex = blockchain.CurrentFastBlock().Hash().Hex() } + } + importMode := lenPeers > 0 && lsMode == lsModeDiscover && current >= height && !(current == 0 && height == 0) + if importMode { + lsMode = lsModeImport + fOfHeight = "" // strings.Repeat(" ", 12) + fHeightRatio = "" // strings.Repeat(" ", 7) + } + if height == 0 { + fOfHeight = "" // strings.Repeat(" ", 12) + fHeightRatio = "" // strings.Repeat(" ", 7) + } - // Calculate block stats for interval - numBlocksDiff := current - lastLoggedBlockNumber - numTxsDiff := 0 - mGas := new(big.Int) - - var numBlocksDiffPerSecond uint64 - var numTxsDiffPerSecond int - var mGasPerSecond = new(big.Int) - - if numBlocksDiff > 0 && numBlocksDiff != current { - for i := lastLoggedBlockNumber; i <= current; i++ { - b := blockchain.GetBlockByNumber(i) - if b != nil { - numTxsDiff += b.Transactions().Len() - mGas = new(big.Int).Add(mGas, b.GasUsed()) + // Calculate block stats for interval + numBlocksDiff := current - lastLoggedBlockNumber + numTxsDiff := 0 + mGas := new(big.Int) + + var numBlocksDiffPerSecond uint64 + var numTxsDiffPerSecond int + var mGasPerSecond = new(big.Int) + + var dominoGraph string + var nDom int + if numBlocksDiff > 0 && numBlocksDiff != current { + for i := lastLoggedBlockNumber + 1; i <= current; i++ { + b := blockchain.GetBlockByNumber(i) + if b != nil { + txLen := b.Transactions().Len() + // Add to tallies + numTxsDiff += txLen + mGas = new(big.Int).Add(mGas, b.GasUsed()) + // Domino effect + if lsMode == lsModeImport { + if txLen > len(dominoes)-1 { + // prevent slice out of bounds + txLen = len(dominoes) - 1 + } + if nDom <= 20 { + dominoGraph += dominoes[txLen] + } + nDom++ } } } - - // Convert to per-second stats - // FIXME(?): Some degree of rounding will happen. - // For example, if interval is 10s and we get 6 blocks imported in that span, - // stats will show '0' blocks/second. Looks a little strange; but on the other hand, - // precision costs visual space, and normally just looks weird when starting up sync or - // syncing slowly. - numBlocksDiffPerSecond = numBlocksDiff / uint64(intervalI) - - // Don't show initial current / per second val - if lastLoggedBlockNumber == 0 { - numBlocksDiffPerSecond = 0 + if nDom > 20 { + dominoGraph += "…" } + } + dominoGraph = logger.ColorGreen(dominoGraph) + + // Convert to per-second stats + // FIXME(?): Some degree of rounding will happen. + // For example, if interval is 10s and we get 6 blocks imported in that span, + // stats will show '0' blocks/second. Looks a little strange; but on the other hand, + // precision costs visual space, and normally just looks weird when starting up sync or + // syncing slowly. + numBlocksDiffPerSecond = numBlocksDiff / uint64(intervalI) + + // Don't show initial current / per second val + if lastLoggedBlockNumber == 0 { + numBlocksDiffPerSecond = 0 + numBlocksDiff = 0 + } - // Divide by interval to yield per-second stats - numTxsDiffPerSecond = numTxsDiff / intervalI - mGasPerSecond = new(big.Int).Div(mGas, big.NewInt(int64(intervalI))) - mGasPerSecond = new(big.Int).Div(mGasPerSecond, big.NewInt(1000000)) - mGasPerSecondI := mGasPerSecond.Int64() + // Divide by interval to yield per-second stats + numTxsDiffPerSecond = numTxsDiff / intervalI + mGasPerSecond = new(big.Int).Div(mGas, big.NewInt(int64(intervalI))) + mGasPerSecond = new(big.Int).Div(mGasPerSecond, big.NewInt(1000000)) + mGasPerSecondI := mGasPerSecond.Int64() - // Update last logged current block number - lastLoggedBlockNumber = current + // Update last logged current block number + lastLoggedBlockNumber = current - // Format head block hex for printing (eg. d4e…fa3) - cbhexstart := currentBlockHex[2:5] // trim off '0x' prefix - cbhexend := currentBlockHex[(len(currentBlockHex) - 3):] + // Format head block hex for printing (eg. d4e…fa3) + cbhexstart := currentBlockHex[:9] // trim off '0x' prefix - blockprogress := fmt.Sprintf("#%7d%s", current, fOfHeight) - cbhexdisplay := fmt.Sprintf("%s…%s", cbhexstart, cbhexend) - peersdisplay := fmt.Sprintf("%2d/%2d peers", lenPeers, maxPeers) - blocksprocesseddisplay := fmt.Sprintf("%4d/%4d/%2d blks/txs/mgas sec", numBlocksDiffPerSecond, numTxsDiffPerSecond, mGasPerSecondI) + localHeadHeight := fmt.Sprintf("#%7d", current) + localHeadHex := fmt.Sprintf("%s…", cbhexstart) + peersOfMax := fmt.Sprintf("%2d/%2d peers", lenPeers, maxPeers) + domOrHeight := fOfHeight + " " + fHeightRatio + if len(strings.Replace(domOrHeight, " ", "", -1)) != 0 { + domOrHeight = logger.ColorGreen("height") + "=" + greenParenify(domOrHeight) + } else { + domOrHeight = "" + } + var blocksprocesseddisplay string + qosDisplayable := logger.ColorGreen("qos") + "=" + greenParenify(qosDisplay) + if lsMode != lsModeImport { + blocksprocesseddisplay = logger.ColorGreen("~") + greenParenify(fmt.Sprintf("%4d blks %4d txs %2d mgas "+logger.ColorGreen("/sec"), numBlocksDiffPerSecond, numTxsDiffPerSecond, mGasPerSecondI)) + } else { + blocksprocesseddisplay = logger.ColorGreen("+") + greenParenify(fmt.Sprintf("%4d blks %4d txs %8d mgas", numBlocksDiff, numTxsDiff, mGas.Uint64())) + domOrHeight = dominoGraph + qosDisplayable = "" + } + + // Log to ERROR. + headDisplay := greenParenify(localHeadHeight + " " + localHeadHex) + peerDisplay := greenParenify(peersOfMax) - // Log to ERROR. - // This allows maximum user optionality for desired integration with rest of event-based logging. - glog.V(logger.Error).Infof("STATUS SYNC %s %s %s %s %s %s", fMode, blockprogress, fHeightRatio, cbhexdisplay, blocksprocesseddisplay, peersdisplay) + modeIcon := logger.ColorGreen(lsModeIcon[lsMode]) + if lsMode == lsModeDiscover { + modeIcon = lsModeDiscoverSpinners[lsModeN%3] + } + modeIcon = logger.ColorGreen(modeIcon) + lsModeN++ + + // This allows maximum user optionality for desired integration with rest of event-based logging. + glog.D(logger.Warn).Infof("SYNC %s "+modeIcon+"%s %s "+logger.ColorGreen("✌︎︎︎")+"%s %s %s", lsModeName[lsMode], headDisplay, blocksprocesseddisplay, peerDisplay, domOrHeight, qosDisplayable) + } + for { + select { + case <-ticker.C: + if time.Since(chainEventLastSent) > time.Duration(time.Second*time.Duration(int32(intervalI/2))) { + printIntervalStatusLog() + } case <-sigc: // Listen for interrupt ticker.Stop() - glog.V(logger.Debug).Infoln("STATUS SYNC Stopping logging.") + glog.D(logger.Warn).Warnln("SYNC Stopping.") return } } diff --git a/core/blockchain.go b/core/blockchain.go index 252d7835f..8220fa822 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -134,7 +134,7 @@ func NewBlockChain(chainDb ethdb.Database, config *ChainConfig, pow pow.PoW, mux gv := func() HeaderValidator { return bc.Validator() } var err error - bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(chainDb, config, mux, gv, bc.getProcInterrupt) if err != nil { return nil, err } @@ -182,7 +182,7 @@ func NewBlockChainDryrun(chainDb ethdb.Database, config *ChainConfig, pow pow.Po gv := func() HeaderValidator { return bc.Validator() } var err error - bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(chainDb, config, mux, gv, bc.getProcInterrupt) if err != nil { return nil, err } @@ -208,6 +208,10 @@ func NewBlockChainDryrun(chainDb ethdb.Database, config *ChainConfig, pow pow.Po return bc, nil } +func (self *BlockChain) GetEventMux() *event.TypeMux { + return self.eventMux +} + func (self *BlockChain) getProcInterrupt() bool { return atomic.LoadInt32(&self.procInterrupt) == 1 } @@ -1386,6 +1390,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err defer close(nonceAbort) txcount := 0 + var latestBlockTime time.Time for i, block := range chain { if atomic.LoadInt32(&self.procInterrupt) == 1 { glog.V(logger.Debug).Infoln("Premature abort during block chain processing") @@ -1482,6 +1487,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err if err != nil { return i, err } + latestBlockTime = time.Unix(block.Time().Int64(), 0) switch status { case CanonStatTy: @@ -1511,9 +1517,19 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err stats.processed++ } - if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { + if stats.queued > 0 || stats.processed > 0 || stats.ignored > 0 { tend := time.Since(tstart) start, end := chain[0], chain[len(chain)-1] + events = append(events, ChainInsertEvent{ + stats.processed, + stats.queued, + stats.ignored, + txcount, + end.NumberU64(), + end.Hash(), + tend, + latestBlockTime, + }) if logger.MlogEnabled() { mlogBlockchain.Send(mlogBlockchainInsertBlocks.SetDetailValues( stats.processed, diff --git a/core/blockchain_test.go b/core/blockchain_test.go index fa9f187f2..23da1fd61 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -34,10 +34,16 @@ import ( "github.com/ethereumproject/go-ethereum/crypto" "github.com/ethereumproject/go-ethereum/ethdb" "github.com/ethereumproject/go-ethereum/event" + "github.com/ethereumproject/go-ethereum/logger/glog" "github.com/ethereumproject/go-ethereum/rlp" "github.com/hashicorp/golang-lru" ) +func init() { + // Disable any display logs for tests. + glog.SetD(0) +} + // GenesisBlockForTesting creates a block in which addr has the given wei balance. // The state trie of the block is written to db. the passed db needs to contain a state root func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big.Int) *types.Block { @@ -496,7 +502,7 @@ func chm(t testing.TB, genesis *types.Block, db ethdb.Database) *BlockChain { } valFn := func() HeaderValidator { return bc.Validator() } var err error - bc.hc, err = NewHeaderChain(db, config, valFn, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, config, bc.eventMux, valFn, bc.getProcInterrupt) if err != nil { t.Fatal(err) } diff --git a/core/events.go b/core/events.go index 7277e637d..0170ae14e 100644 --- a/core/events.go +++ b/core/events.go @@ -19,6 +19,8 @@ package core import ( "math/big" + "time" + "github.com/ethereumproject/go-ethereum/common" "github.com/ethereumproject/go-ethereum/core/types" "github.com/ethereumproject/go-ethereum/core/vm" @@ -67,11 +69,32 @@ type ChainSideEvent struct { Logs vm.Logs } +// TODO: no usages found in project files type PendingBlockEvent struct { Block *types.Block Logs vm.Logs } +type ChainInsertEvent struct { + Processed int + Queued int + Ignored int + TxCount int + LastNumber uint64 + LastHash common.Hash + Elasped time.Duration + LatestBlockTime time.Time +} + +type HeaderChainInsertEvent struct { + Processed int + Ignored int + LastNumber uint64 + LastHash common.Hash + Elasped time.Duration +} + +// TODO: no usages found in project files type ChainUncleEvent struct { Block *types.Block } @@ -82,4 +105,4 @@ type GasPriceChanged struct{ Price *big.Int } // Mining operation events type StartMining struct{} -type TopMining struct{} +type StopMining struct{} diff --git a/core/headerchain.go b/core/headerchain.go index 477793d99..1af35f4e2 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -29,6 +29,7 @@ import ( "github.com/ethereumproject/go-ethereum/common" "github.com/ethereumproject/go-ethereum/core/types" "github.com/ethereumproject/go-ethereum/ethdb" + "github.com/ethereumproject/go-ethereum/event" "github.com/ethereumproject/go-ethereum/logger" "github.com/ethereumproject/go-ethereum/logger/glog" "github.com/ethereumproject/go-ethereum/pow" @@ -56,6 +57,7 @@ type HeaderChain struct { rand *mrand.Rand getValidator getHeaderValidatorFn + eventMux *event.TypeMux } // getHeaderValidatorFn returns a HeaderValidator interface @@ -65,7 +67,7 @@ type getHeaderValidatorFn func() HeaderValidator // getValidator should return the parent's validator // procInterrupt points to the parent's interrupt semaphore // wg points to the parent's shutdown wait group -func NewHeaderChain(chainDb ethdb.Database, config *ChainConfig, getValidator getHeaderValidatorFn, procInterrupt func() bool) (*HeaderChain, error) { +func NewHeaderChain(chainDb ethdb.Database, config *ChainConfig, mux *event.TypeMux, getValidator getHeaderValidatorFn, procInterrupt func() bool) (*HeaderChain, error) { headerCache, _ := lru.New(headerCacheLimit) tdCache, _ := lru.New(tdCacheLimit) @@ -77,6 +79,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *ChainConfig, getValidator ge hc := &HeaderChain{ config: config, + eventMux: mux, chainDb: chainDb, headerCache: headerCache, tdCache: tdCache, @@ -207,6 +210,7 @@ type WhCallback func(*types.Header) error // because nonces can be verified sparsely, not needing to check each. func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, writeHeader WhCallback) (int, error) { // Collect some import statistics to report on + var events []interface{} stats := struct{ processed, ignored int }{} start := time.Now() @@ -307,8 +311,18 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w } // Report some public statistics so the user has a clue what's going on first, last := chain[0], chain[len(chain)-1] + elapsed := time.Since(start) glog.V(logger.Info).Infof("imported %d header(s) (%d ignored) in %v. #%v [%x… / %x…]", stats.processed, stats.ignored, - time.Since(start), last.Number, first.Hash().Bytes()[:4], last.Hash().Bytes()[:4]) + elapsed, last.Number, first.Hash().Bytes()[:4], last.Hash().Bytes()[:4]) + + events = append(events, HeaderChainInsertEvent{ + Processed: stats.processed, + Ignored: stats.ignored, + LastNumber: last.Number.Uint64(), + LastHash: last.Hash(), + Elasped: elapsed, + }) + go hc.postChainEvents(events) return 0, nil } @@ -489,6 +503,16 @@ func (hc *HeaderChain) SetGenesis(head *types.Header) { hc.genesisHeader = head } +// postChainEvents iterates over the events generated by a chain insertion and +// posts them into the event mux. +func (hc *HeaderChain) postChainEvents(events []interface{}) { + // post event logs for further processing + for _, event := range events { + // Fire the insertion events individually + hc.eventMux.Post(event) + } +} + // headerValidator is responsible for validating block headers // // headerValidator implements HeaderValidator. diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 85e8c42b9..22adc91d7 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -22,11 +22,24 @@ import ( "github.com/ethereumproject/go-ethereum/event" "github.com/ethereumproject/go-ethereum/rpc" + "github.com/ethereumproject/go-ethereum/common" + "math/big" ) -type DoneEvent struct{} -type StartEvent struct{} -type FailedEvent struct{ Err error } +type DoneEvent struct{ + Peer *peer + Hash common.Hash + TD *big.Int +} +type StartEvent struct{ + Peer *peer + Hash common.Hash + TD *big.Int +} +type FailedEvent struct{ + Peer *peer + Err error +} // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 2106798ad..8b1b210ba 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -246,6 +246,13 @@ func (d *Downloader) Progress() (uint64, uint64, uint64, uint64, uint64) { return d.syncStatsChainOrigin, current, d.syncStatsChainHeight, d.syncStatsStateDone, d.syncStatsStateDone + pendingStates } +func (d *Downloader) Qos() (rtt time.Duration, ttl time.Duration, conf float64) { + rtt = d.requestRTT() + ttl = d.requestTTL() + conf = float64(d.rttConfidence) / 1000000.0 + return +} + // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { return atomic.LoadInt32(&d.synchronising) > 0