Skip to content

Commit

Permalink
Merge pull request #3275 from filecoin-project/fix/chainwatch/sync
Browse files Browse the repository at this point in the history
fix(chainwatch): use height to determine unsynced blocks and fix deadlock in sector deal table
  • Loading branch information
magik6k authored Aug 28, 2020
2 parents 1d9b1f4 + fa6f9f4 commit 932ab61
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 304 deletions.
48 changes: 0 additions & 48 deletions cmd/lotus-chainwatch/processor/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
log.Fatalw("Failed to persist market actors", "error", err)
}

// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
close(p.sectorDealEvents)
return err
}

if err := p.updateMarket(ctx, marketChanges); err != nil {
log.Fatalw("Failed to update market actors", "error", err)
}
Expand Down Expand Up @@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip

}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
start := time.Now()
defer func() {
Expand Down
13 changes: 0 additions & 13 deletions cmd/lotus-chainwatch/processor/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package processor
import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -120,10 +119,6 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c
}

func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Receipts", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -164,10 +159,6 @@ create temp table recs (like receipts excluding constraints) on commit drop;
}

func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -206,10 +197,6 @@ create temp table mi (like block_messages excluding constraints) on commit drop;
}

func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Messages", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down
52 changes: 49 additions & 3 deletions cmd/lotus-chainwatch/processor/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8)
p.sectorDealEvents = make(chan *SectorDealEvent, 8)
dealEvents := make(chan *SectorDealEvent, 8)

grp.Go(func() error {
return p.storePreCommitDealInfo(dealEvents)
})

grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
Expand All @@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
grp.Go(func() error {
defer func() {
close(preCommitEvents)
close(p.sectorDealEvents)
close(dealEvents)
}()
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents)
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
})

grp.Go(func() error {
Expand Down Expand Up @@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return tx.Commit()
}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
Expand Down
2 changes: 0 additions & 2 deletions cmd/lotus-chainwatch/processor/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func (p *Processor) subMpool(ctx context.Context) {
msgs[v.Message.Message.Cid()] = &v.Message.Message
}

log.Debugf("Processing %d mpool updates", len(msgs))

err := p.storeMessages(msgs)
if err != nil {
log.Error(err)
Expand Down
104 changes: 78 additions & 26 deletions cmd/lotus-chainwatch/processor/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"golang.org/x/xerrors"

"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
Expand All @@ -15,7 +16,19 @@ import (
type powerActorInfo struct {
common actorInfo

epochSmoothingEstimate *smoothing.FilterEstimate
totalRawBytes big.Int
totalRawBytesCommitted big.Int
totalQualityAdjustedBytes big.Int
totalQualityAdjustedBytesCommitted big.Int
totalPledgeCollateral big.Int

newRawBytes big.Int
newQualityAdjustedBytes big.Int
newPledgeCollateral big.Int
newQAPowerSmoothed *smoothing.FilterEstimate

minerCount int64
minerCountAboveMinimumPower int64
}

func (p *Processor) setupPower() error {
Expand All @@ -25,13 +38,27 @@ func (p *Processor) setupPower() error {
}

if _, err := tx.Exec(`
create table if not exists power_smoothing_estimates
create table if not exists chain_power
(
state_root text not null
constraint power_smoothing_estimates_pk
primary key,
position_estimate text not null,
velocity_estimate text not null
state_root text not null
constraint power_smoothing_estimates_pk
primary key,
new_raw_bytes_power text not null,
new_qa_bytes_power text not null,
new_pledge_collateral text not null,
total_raw_bytes_power text not null,
total_raw_bytes_committed text not null,
total_qa_bytes_power text not null,
total_qa_bytes_committed text not null,
total_pledge_collateral text not null,
qa_smoothed_position_estimate text not null,
qa_smoothed_velocity_estimate text not null,
miner_count int not null,
minimum_consensus_miner_count int not null
);
`); err != nil {
return err
Expand Down Expand Up @@ -60,8 +87,8 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
}()

var out []powerActorInfo
for tipset, powers := range powerTips {
for _, act := range powers {
for tipset, powerStates := range powerTips {
for _, act := range powerStates {
var pw powerActorInfo
pw.common = act

Expand All @@ -80,54 +107,79 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
}

pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed
pw.totalRawBytes = powerActorState.TotalRawBytePower
pw.totalRawBytesCommitted = powerActorState.TotalBytesCommitted
pw.totalQualityAdjustedBytes = powerActorState.TotalQualityAdjPower
pw.totalQualityAdjustedBytesCommitted = powerActorState.TotalQABytesCommitted
pw.totalPledgeCollateral = powerActorState.TotalPledgeCollateral

pw.newRawBytes = powerActorState.ThisEpochRawBytePower
pw.newQualityAdjustedBytes = powerActorState.ThisEpochQualityAdjPower
pw.newPledgeCollateral = powerActorState.ThisEpochPledgeCollateral
pw.newQAPowerSmoothed = powerActorState.ThisEpochQAPowerSmoothed

pw.minerCount = powerActorState.MinerCount
pw.minerCountAboveMinimumPower = powerActorState.MinerAboveMinPowerCount
out = append(out, pw)
}
}

return out, nil
}

func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error {
func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error {
// NB: use errgroup when there is more than a single store operation
return p.storePowerSmoothingEstimates(powers)
return p.storePowerSmoothingEstimates(powerStates)
}

func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error {
func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error {
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err)
return xerrors.Errorf("begin chain_power tx: %w", err)
}

if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil {
return xerrors.Errorf("prep power_smoothing_estimates: %w", err)
if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_power: %w", err)
}

stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
stmt, err := tx.Prepare(`copy cp (state_root, new_raw_bytes_power, new_qa_bytes_power, new_pledge_collateral, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`)
if err != nil {
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err)
return xerrors.Errorf("prepare tmp chain_power: %w", err)
}

for _, powerState := range powers {
for _, ps := range powerStates {
if _, err := stmt.Exec(
powerState.common.stateroot.String(),
powerState.epochSmoothingEstimate.PositionEstimate.String(),
powerState.epochSmoothingEstimate.VelocityEstimate.String(),
ps.common.stateroot.String(),
ps.newRawBytes.String(),
ps.newQualityAdjustedBytes.String(),
ps.newPledgeCollateral.String(),

ps.totalRawBytes.String(),
ps.totalRawBytesCommitted.String(),
ps.totalQualityAdjustedBytes.String(),
ps.totalQualityAdjustedBytesCommitted.String(),
ps.totalPledgeCollateral.String(),

ps.newQAPowerSmoothed.PositionEstimate.String(),
ps.newQAPowerSmoothed.VelocityEstimate.String(),

ps.minerCount,
ps.minerCountAboveMinimumPower,
); err != nil {
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err)
return xerrors.Errorf("close prepared chain_power: %w", err)
}

if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err)
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_power from tmp: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err)
return xerrors.Errorf("commit chain_power tx: %w", err)
}

return nil
Expand Down
Loading

0 comments on commit 932ab61

Please sign in to comment.