From b434296fe9f7913102989d52b9dd639ed63eaefb Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 8 Nov 2021 11:00:36 +0800 Subject: [PATCH 1/4] Optimized update validator up time logic, added debug trace. --- projection/validator/validator.go | 105 +++++++++++++++++++++++- projection/validator/view/validators.go | 58 +++++++++++++ 2 files changed, 160 insertions(+), 3 deletions(-) diff --git a/projection/validator/validator.go b/projection/validator/validator.go index f7d8fdd5..8ec61090 100644 --- a/projection/validator/validator.go +++ b/projection/validator/validator.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "os" "strconv" "time" @@ -34,6 +35,16 @@ type Validator struct { } func NewValidator(logger applogger.Logger, rdbConn rdb.Conn, conNodeAddressPrefix string) *Validator { + + debugFile, err := os.OpenFile("validator.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + panic(err) + } + defer debugFile.Close() + if _, err = debugFile.WriteString("Height,projectValidatorView,projectValidatorActivitiesView,ListAll,BlockCreated-insertCommit,BlockCreated-updateValidator,BlockCreated-Total,Total\n"); err != nil { + panic(err) + } + return &Validator{ rdbprojectionbase.NewRDbBase(rdbConn.ToHandle(), "Validator"), @@ -71,6 +82,17 @@ func (projection *Validator) OnInit() error { } func (projection *Validator) HandleEvents(height int64, events []event_entity.Event) error { + + debugFile, err := os.OpenFile("validator.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + panic(err) + } + defer debugFile.Close() + if _, err = debugFile.WriteString(strconv.FormatInt(height, 10) + ","); err != nil { + panic(err) + } + + debugStart := time.Now() rdbTx, err := projection.rdbConn.Begin() if err != nil { return fmt.Errorf("error beginning transaction: %v", err) @@ -108,6 +130,14 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error projecting validator view: %v", projectErr) } + debug1 := time.Now() + debugDiff1 := debug1.Sub(debugStart) + // projection.logger.Info(fmt.Sprintf("After projectValidatorView(): %s", debugDiff1)) + + if _, err = debugFile.WriteString(debugDiff1.String() + ","); err != nil { + panic(err) + } + if projectErr := projection.projectValidatorActivitiesView( validatorsView, validatorActivitiesView, @@ -119,6 +149,14 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error projecting validator activities view: %v", err) } + debug2 := time.Now() + debugDiff2 := debug2.Sub(debug1) + // projection.logger.Info(fmt.Sprintf("Time for projectValidatorActivitiesView: %s", debugDiff2)) + + if _, err = debugFile.WriteString(debugDiff2.String() + ","); err != nil { + panic(err) + } + validatorList, listValidatorErr := validatorsView.ListAll(view.ValidatorsListFilter{ MaybeStatuses: nil, }, view.ValidatorsListOrder{MaybePower: nil}) @@ -130,8 +168,19 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev validatorMap[validator.TendermintAddress] = &validatorList[i] } + debug3 := time.Now() + debugDiff3 := debug3.Sub(debug2) + // projection.logger.Info(fmt.Sprintf("Time for ListAll: %s", debugDiff3)) + + if _, err = debugFile.WriteString(debugDiff3.String() + ","); err != nil { + panic(err) + } + for _, event := range events { if blockCreatedEvent, ok := event.(*event_usecase.BlockCreated); ok { + + debugBlockCreatedStart := time.Now() + signatureCount := len(blockCreatedEvent.Block.Signatures) commitmentRows := make([]view.ValidatorBlockCommitmentRow, 0, signatureCount) @@ -185,6 +234,14 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error incrementing overall validator block commitments total: %v", err) } + debugBlockCreatedEnd1 := time.Now() + debugDiffBlockCreated1 := debugBlockCreatedEnd1.Sub(debugBlockCreatedStart) + // projection.logger.Info(fmt.Sprintf("BlockCreated insert commitment rows: %s", debugDiffBlockCreated1)) + + if _, err = debugFile.WriteString(debugDiffBlockCreated1.String() + ","); err != nil { + panic(err) + } + // Update validator up time activeValidators, activeValidatorsQueryErr := validatorsView.ListAll(view.ValidatorsListFilter{ MaybeStatuses: []constants.Status{constants.BONDED}, @@ -193,6 +250,7 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error querying active validators: %v", activeValidatorsQueryErr) } + var mutActiveValidators []view.ValidatorRow for _, activeValidator := range activeValidators { mutActiveValidator := activeValidator if commitmentMap[mutActiveValidator.ConsensusNodeAddress] { @@ -210,12 +268,33 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev new(big.Float).SetInt64(mutActiveValidator.TotalActiveBlock), ) - if activeValidatorUpdateErr := validatorsView.Update(&mutActiveValidator); activeValidatorUpdateErr != nil { - return fmt.Errorf("error updating active validators up time data: %v", activeValidatorUpdateErr) - } + mutActiveValidators = append(mutActiveValidators, mutActiveValidator) + } + + if activeValidatorUpdateErr := validatorsView.UpdateAllValidatorUpTime(mutActiveValidators); activeValidatorUpdateErr != nil { + return fmt.Errorf("error updating active validators up time data: %v", activeValidatorUpdateErr) + } + + debugBlockCreatedEnd2 := time.Now() + debugDiffBlockCreated2 := debugBlockCreatedEnd2.Sub(debugBlockCreatedEnd1) + // projection.logger.Info(fmt.Sprintf("BlockCreated update validator up time: %s", debugDiffBlockCreated2)) + + if _, err = debugFile.WriteString(debugDiffBlockCreated2.String() + ","); err != nil { + panic(err) + } + + debugBlockCreatedEnd := time.Now() + debugDiffBlockCreated := debugBlockCreatedEnd.Sub(debugBlockCreatedStart) + // projection.logger.Info(fmt.Sprintf("Total Time for BlockCreated: %s", debugDiffBlockCreated)) + + if _, err = debugFile.WriteString(debugDiffBlockCreated.String() + ","); err != nil { + panic(err) } } else if votedEvent, ok := event.(*event_usecase.MsgVote); ok { + + debugMsgVoteStart := time.Now() + projection.logger.Debug("handling MsgVote event") mutVotedValidator, votedValidatorQueryErr := validatorsView.FindBy(view.ValidatorIdentity{ @@ -234,6 +313,15 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev if votedValidatorUpdateErr := validatorsView.Update(mutVotedValidator); votedValidatorUpdateErr != nil { return fmt.Errorf("error updating voted validator: %v", votedValidatorUpdateErr) } + + debugMsgVoteEnd := time.Now() + debugDiffMsgVote := debugMsgVoteEnd.Sub(debugMsgVoteStart) + // projection.logger.Info(fmt.Sprintf("Total Time for MsgVote: %s", debugDiffMsgVote)) + + if _, err = debugFile.WriteString(debugDiffMsgVote.String() + ","); err != nil { + panic(err) + } + } } @@ -245,6 +333,17 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error committing changes: %v", err) } committed = true + + debugEnd := time.Now() + debugTotalRunningTime := debugEnd.Sub(debugStart) + // projection.logger.Info(fmt.Sprintf("Total: %s", debugTotalRunningTime)) + + if _, err = debugFile.WriteString(debugTotalRunningTime.String() + "\n"); err != nil { + panic(err) + } + + // time.Sleep(1 * time.Second) + return nil } diff --git a/projection/validator/view/validators.go b/projection/validator/view/validators.go index 55e973a5..35f1575d 100644 --- a/projection/validator/view/validators.go +++ b/projection/validator/view/validators.go @@ -241,6 +241,64 @@ func (validatorsView *Validators) Update(validator *ValidatorRow) error { return nil } +func (validatorsView *Validators) UpdateAllValidatorUpTime(validators []ValidatorRow) error { + + pendingRowCount := 0 + totalRowCount := len(validators) + + sql := "" + + for i, validator := range validators { + + if pendingRowCount == 0 { + + sql = `UPDATE view_validators AS view SET + total_signed_block = row.total_signed_block, + total_active_block = row.total_active_block, + imprecise_up_time = row.imprecise_up_time + FROM (VALUES + ` + } + + sql += fmt.Sprintf( + "(%d, %d, %d, %s),\n", + *validator.MaybeId, + validator.TotalSignedBlock, + validator.TotalActiveBlock, + validator.ImpreciseUpTime.String(), + ) + + pendingRowCount += 1 + + if pendingRowCount == 500 || i+1 == totalRowCount { + + sql = strings.TrimSuffix(sql, ",\n") + + sql += `) AS row( + id, + total_signed_block, + total_active_block, + imprecise_up_time + ) + WHERE row.id = view.id;` + + result, err := validatorsView.rdb.Exec(sql) + if err != nil { + return fmt.Errorf("error updating validators into the table: %v: %w", err, rdb.ErrWrite) + } + if result.RowsAffected() != int64(pendingRowCount) { + return fmt.Errorf("error updating validators into the table: wrong number of affected rows %d: %w", result.RowsAffected(), rdb.ErrWrite) + } + + pendingRowCount = 0 + + } + + } + + return nil +} + type ValidatorsListFilter struct { MaybeStatuses []constants.Status } From c38f8ccdcecd6de3a48b995a968853f94ff62dfe Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 8 Nov 2021 11:10:30 +0800 Subject: [PATCH 2/4] Removed debug trace --- projection/validator/validator.go | 92 ------------------------- projection/validator/view/validators.go | 4 +- 2 files changed, 2 insertions(+), 94 deletions(-) diff --git a/projection/validator/validator.go b/projection/validator/validator.go index 8ec61090..b7ba4526 100644 --- a/projection/validator/validator.go +++ b/projection/validator/validator.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "os" "strconv" "time" @@ -35,16 +34,6 @@ type Validator struct { } func NewValidator(logger applogger.Logger, rdbConn rdb.Conn, conNodeAddressPrefix string) *Validator { - - debugFile, err := os.OpenFile("validator.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - panic(err) - } - defer debugFile.Close() - if _, err = debugFile.WriteString("Height,projectValidatorView,projectValidatorActivitiesView,ListAll,BlockCreated-insertCommit,BlockCreated-updateValidator,BlockCreated-Total,Total\n"); err != nil { - panic(err) - } - return &Validator{ rdbprojectionbase.NewRDbBase(rdbConn.ToHandle(), "Validator"), @@ -82,17 +71,6 @@ func (projection *Validator) OnInit() error { } func (projection *Validator) HandleEvents(height int64, events []event_entity.Event) error { - - debugFile, err := os.OpenFile("validator.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - panic(err) - } - defer debugFile.Close() - if _, err = debugFile.WriteString(strconv.FormatInt(height, 10) + ","); err != nil { - panic(err) - } - - debugStart := time.Now() rdbTx, err := projection.rdbConn.Begin() if err != nil { return fmt.Errorf("error beginning transaction: %v", err) @@ -130,14 +108,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error projecting validator view: %v", projectErr) } - debug1 := time.Now() - debugDiff1 := debug1.Sub(debugStart) - // projection.logger.Info(fmt.Sprintf("After projectValidatorView(): %s", debugDiff1)) - - if _, err = debugFile.WriteString(debugDiff1.String() + ","); err != nil { - panic(err) - } - if projectErr := projection.projectValidatorActivitiesView( validatorsView, validatorActivitiesView, @@ -149,14 +119,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error projecting validator activities view: %v", err) } - debug2 := time.Now() - debugDiff2 := debug2.Sub(debug1) - // projection.logger.Info(fmt.Sprintf("Time for projectValidatorActivitiesView: %s", debugDiff2)) - - if _, err = debugFile.WriteString(debugDiff2.String() + ","); err != nil { - panic(err) - } - validatorList, listValidatorErr := validatorsView.ListAll(view.ValidatorsListFilter{ MaybeStatuses: nil, }, view.ValidatorsListOrder{MaybePower: nil}) @@ -168,19 +130,9 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev validatorMap[validator.TendermintAddress] = &validatorList[i] } - debug3 := time.Now() - debugDiff3 := debug3.Sub(debug2) - // projection.logger.Info(fmt.Sprintf("Time for ListAll: %s", debugDiff3)) - - if _, err = debugFile.WriteString(debugDiff3.String() + ","); err != nil { - panic(err) - } - for _, event := range events { if blockCreatedEvent, ok := event.(*event_usecase.BlockCreated); ok { - debugBlockCreatedStart := time.Now() - signatureCount := len(blockCreatedEvent.Block.Signatures) commitmentRows := make([]view.ValidatorBlockCommitmentRow, 0, signatureCount) @@ -234,14 +186,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error incrementing overall validator block commitments total: %v", err) } - debugBlockCreatedEnd1 := time.Now() - debugDiffBlockCreated1 := debugBlockCreatedEnd1.Sub(debugBlockCreatedStart) - // projection.logger.Info(fmt.Sprintf("BlockCreated insert commitment rows: %s", debugDiffBlockCreated1)) - - if _, err = debugFile.WriteString(debugDiffBlockCreated1.String() + ","); err != nil { - panic(err) - } - // Update validator up time activeValidators, activeValidatorsQueryErr := validatorsView.ListAll(view.ValidatorsListFilter{ MaybeStatuses: []constants.Status{constants.BONDED}, @@ -275,26 +219,8 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error updating active validators up time data: %v", activeValidatorUpdateErr) } - debugBlockCreatedEnd2 := time.Now() - debugDiffBlockCreated2 := debugBlockCreatedEnd2.Sub(debugBlockCreatedEnd1) - // projection.logger.Info(fmt.Sprintf("BlockCreated update validator up time: %s", debugDiffBlockCreated2)) - - if _, err = debugFile.WriteString(debugDiffBlockCreated2.String() + ","); err != nil { - panic(err) - } - - debugBlockCreatedEnd := time.Now() - debugDiffBlockCreated := debugBlockCreatedEnd.Sub(debugBlockCreatedStart) - // projection.logger.Info(fmt.Sprintf("Total Time for BlockCreated: %s", debugDiffBlockCreated)) - - if _, err = debugFile.WriteString(debugDiffBlockCreated.String() + ","); err != nil { - panic(err) - } - } else if votedEvent, ok := event.(*event_usecase.MsgVote); ok { - debugMsgVoteStart := time.Now() - projection.logger.Debug("handling MsgVote event") mutVotedValidator, votedValidatorQueryErr := validatorsView.FindBy(view.ValidatorIdentity{ @@ -314,14 +240,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error updating voted validator: %v", votedValidatorUpdateErr) } - debugMsgVoteEnd := time.Now() - debugDiffMsgVote := debugMsgVoteEnd.Sub(debugMsgVoteStart) - // projection.logger.Info(fmt.Sprintf("Total Time for MsgVote: %s", debugDiffMsgVote)) - - if _, err = debugFile.WriteString(debugDiffMsgVote.String() + ","); err != nil { - panic(err) - } - } } @@ -334,16 +252,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev } committed = true - debugEnd := time.Now() - debugTotalRunningTime := debugEnd.Sub(debugStart) - // projection.logger.Info(fmt.Sprintf("Total: %s", debugTotalRunningTime)) - - if _, err = debugFile.WriteString(debugTotalRunningTime.String() + "\n"); err != nil { - panic(err) - } - - // time.Sleep(1 * time.Second) - return nil } diff --git a/projection/validator/view/validators.go b/projection/validator/view/validators.go index 35f1575d..97e1c2fd 100644 --- a/projection/validator/view/validators.go +++ b/projection/validator/view/validators.go @@ -284,10 +284,10 @@ func (validatorsView *Validators) UpdateAllValidatorUpTime(validators []Validato result, err := validatorsView.rdb.Exec(sql) if err != nil { - return fmt.Errorf("error updating validators into the table: %v: %w", err, rdb.ErrWrite) + return fmt.Errorf("error updating validators up time into the table: %v: %w", err, rdb.ErrWrite) } if result.RowsAffected() != int64(pendingRowCount) { - return fmt.Errorf("error updating validators into the table: wrong number of affected rows %d: %w", result.RowsAffected(), rdb.ErrWrite) + return fmt.Errorf("error updating validators up time into the table: wrong number of affected rows %d: %w", result.RowsAffected(), rdb.ErrWrite) } pendingRowCount = 0 From 01bdb0cd5803012a8f24950742c675fa986960c3 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 8 Nov 2021 11:15:01 +0800 Subject: [PATCH 3/4] Removed config file from chain-indexing repo --- config/config.mainnet.toml | 94 -------------------------------------- 1 file changed, 94 deletions(-) delete mode 100644 config/config.mainnet.toml diff --git a/config/config.mainnet.toml b/config/config.mainnet.toml deleted file mode 100644 index 3a715b1b..00000000 --- a/config/config.mainnet.toml +++ /dev/null @@ -1,94 +0,0 @@ -[blockchain] -bonding_denom = "basecro" -account_address_prefix = "cro" -account_pubkey_prefix = "cro" -validator_address_prefix = "crocncl" -validator_pubkey_prefix = "crocncl" -connode_address_prefix = "crocnclcons" -connode_pubkey_prefix = "crocnclconspub" - -[cosmos_version_enabled_height] -# chain-maind v2.x.x starting height in mainnet -v0_42_7 = 922362 - -[system] -# mode of the system, possible values: EVENT_STORE,TENDERMINT_DIRECT -# EVENT_STORE mode: synced blocks are parsed to events and persist to event store. Projections will replay events from -# event store. -# TENDERMINT_DIRECT mode: synced blocks are parsed to events and are replayed directly by projections. -# API_ONLY mode: indexing is disabled and provide API service only -mode = "TENDERMINT_DIRECT" - -[sync] -# how many sync jobs running in parallel -window_size = 50 - -[tendermint] -http_rpc_url = "https://mainnet.crypto.org:26657" -insecure = false -# When enabled, genssi parsing will reject any non-Cosmos SDK built-in module -# inside genesis file. -strict_genesis_parsing = false - -[cosmosapp] -http_rpc_url = "https://mainnet.crypto.org:1317" -insecure = false - -[http] -listening_address = "0.0.0.0:8080" -route_prefix = "/" -# A list of origins a cross-domain request is allowed to be requested from -# Default value '[]' disables CORS support -# Use '["*"]' to allow request from any origin -cors_allowed_origins = [] -cors_allowed_methods = ["HEAD", "GET"] -cors_allowed_headers = ["Origin", "Accept", "Content-Type", "X-Requested-With", "X-Server-Time"] - -[debug] -pprof_enable = false -pprof_listening_address = "0.0.0.0:3000" - -[database] -host = "localhost" -port = 5432 -username = "postgres" -# password can only be provided through CLI or Environment variable `DB_PASSWORD` -name = "postgres" -schema = "public" -ssl = true - -[postgres] -pool_max_conns = 100 -pool_min_conns = 0 -pool_max_conn_lifetime = "1h" -pool_max_conn_idle_time = "30m" -pool_health_check_interval = "1m" - -[logger] -# comma separated log levels. possible values: debug,info,error,panic -level = "debug" -color = false - -[projection] -enables = [ - "AccountMessage", - "AccountTransaction", - "Block", - "BlockEvent", - "ChainStats", - "Proposal", - "Transaction", - "Validator", - "ValidatorStats", - "NFT", -# "CryptoComNFT", - "IBCChannel", -# "IBCChannelTxMsgTrace", - "IBCChannelMessage", -# "BridgePendingActivity" -] - -[cronjob] -enables = [ -# "BridgeActivityMatcher" -] From 47ec132917e8d128d3c2c374b6e3da1a623a2fd3 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 8 Nov 2021 12:19:03 +0800 Subject: [PATCH 4/4] Removed extra empty line --- projection/validator/validator.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/projection/validator/validator.go b/projection/validator/validator.go index b7ba4526..d10d3ec4 100644 --- a/projection/validator/validator.go +++ b/projection/validator/validator.go @@ -132,7 +132,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev for _, event := range events { if blockCreatedEvent, ok := event.(*event_usecase.BlockCreated); ok { - signatureCount := len(blockCreatedEvent.Block.Signatures) commitmentRows := make([]view.ValidatorBlockCommitmentRow, 0, signatureCount) @@ -220,7 +219,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev } } else if votedEvent, ok := event.(*event_usecase.MsgVote); ok { - projection.logger.Debug("handling MsgVote event") mutVotedValidator, votedValidatorQueryErr := validatorsView.FindBy(view.ValidatorIdentity{ @@ -239,7 +237,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev if votedValidatorUpdateErr := validatorsView.Update(mutVotedValidator); votedValidatorUpdateErr != nil { return fmt.Errorf("error updating voted validator: %v", votedValidatorUpdateErr) } - } } @@ -251,7 +248,6 @@ func (projection *Validator) HandleEvents(height int64, events []event_entity.Ev return fmt.Errorf("error committing changes: %v", err) } committed = true - return nil }