forked from erigontech/erigon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstage.go
177 lines (154 loc) · 6.18 KB
/
stage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package stagedsync
import (
"context"
"encoding/binary"
"fmt"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/log/v3"
)
type ExecutePayloadFunc func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) error
// ExecFunc is the execution function for the stage to move forward.
// * state - is the current state of the stage and contains stage data.
// * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used.
type ExecFunc func(firstCycle bool, badBlockUnwind bool, s *StageState, unwinder Unwinder, tx kv.RwTx) error
// UnwindFunc is the unwinding logic of the stage.
// * unwindState - contains information about the unwind itself.
// * stageState - represents the state of this stage at the beginning of unwind.
type UnwindFunc func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error
// PruneFunc is the execution function for the stage to prune old data.
// * state - is the current state of the stage and contains stage data.
type PruneFunc func(firstCycle bool, p *PruneState, tx kv.RwTx) error
// Stage is a single sync stage in staged sync.
type Stage struct {
// Description is a string that is shown in the logs.
Description string
// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
DisabledDescription string
// Forward is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil!
Forward ExecFunc
// Unwind is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil!
Unwind UnwindFunc
Prune PruneFunc
// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
ID stages.SyncStage
// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
Disabled bool
}
// StageState is the state of the stage.
type StageState struct {
state *Sync
ID stages.SyncStage
BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution.
}
func (s *StageState) LogPrefix() string { return s.state.LogPrefix() }
// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error {
if m, ok := syncMetrics[s.ID]; ok {
m.Set(newBlockNum)
}
return stages.SaveStageProgress(db, s.ID, newBlockNum)
}
func (s *StageState) UpdatePrune(db kv.Putter, blockNum uint64) error {
return stages.SaveStagePruneProgress(db, s.ID, blockNum)
}
// ExecutionAt gets the current state of the "Execution" stage, which block is currently executed.
func (s *StageState) ExecutionAt(db kv.Getter) (uint64, error) {
execution, err := stages.GetStageProgress(db, stages.Execution)
return execution, err
}
// Unwinder allows the stage to cause an unwind.
type Unwinder interface {
// UnwindTo begins staged sync unwind to the specified block.
UnwindTo(unwindPoint uint64, badBlock common.Hash)
}
// UnwindState contains the information about unwind.
type UnwindState struct {
ID stages.SyncStage
// UnwindPoint is the block to unwind to.
UnwindPoint uint64
CurrentBlockNumber uint64
// If unwind is caused by a bad block, this hash is not empty
BadBlock common.Hash
state *Sync
}
func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() }
// Done updates the DB state of the stage.
func (u *UnwindState) Done(db kv.Putter) error {
return stages.SaveStageProgress(db, u.ID, u.UnwindPoint)
}
type PruneState struct {
ID stages.SyncStage
ForwardProgress uint64 // progress of stage forward move
PruneProgress uint64 // progress of stage prune move. after sync cycle it become equal to ForwardProgress by Done() method
state *Sync
}
func (s *PruneState) LogPrefix() string { return s.state.LogPrefix() + " Prune" }
func (s *PruneState) Done(db kv.Putter) error {
return stages.SaveStagePruneProgress(db, s.ID, s.ForwardProgress)
}
func (s *PruneState) DoneAt(db kv.Putter, blockNum uint64) error {
return stages.SaveStagePruneProgress(db, s.ID, blockNum)
}
// PruneTable has `limit` parameter to avoid too large data deletes per one sync cycle - better delete by small portions to reduce db.FreeList size
func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int) error {
c, err := tx.RwCursor(table)
if err != nil {
return fmt.Errorf("failed to create cursor for pruning %w", err)
}
defer c.Close()
i := 0
for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
if err != nil {
return err
}
i++
if i > limit {
break
}
blockNum := binary.BigEndian.Uint64(k)
if blockNum >= pruneTo {
break
}
select {
case <-ctx.Done():
return libcommon.ErrStopped
default:
}
if err = c.DeleteCurrent(); err != nil {
return fmt.Errorf("failed to remove for block %d: %w", blockNum, err)
}
}
return nil
}
func PruneTableDupSort(tx kv.RwTx, table string, logPrefix string, pruneTo uint64, logEvery *time.Ticker, ctx context.Context) error {
c, err := tx.RwCursorDupSort(table)
if err != nil {
return fmt.Errorf("failed to create cursor for pruning %w", err)
}
defer c.Close()
for k, _, err := c.First(); k != nil; k, _, err = c.NextNoDup() {
if err != nil {
return fmt.Errorf("failed to move %s cleanup cursor: %w", table, err)
}
blockNum := binary.BigEndian.Uint64(k)
if blockNum >= pruneTo {
break
}
select {
case <-logEvery.C:
log.Info(fmt.Sprintf("[%s]", logPrefix), "table", table, "block", blockNum)
case <-ctx.Done():
return libcommon.ErrStopped
default:
}
if err = c.DeleteCurrentDuplicates(); err != nil {
return fmt.Errorf("failed to remove for block %d: %w", blockNum, err)
}
}
return nil
}