-
Notifications
You must be signed in to change notification settings - Fork 500
/
signers_processor.go
157 lines (132 loc) · 4 KB
/
signers_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package processors
import (
"context"
"github.com/guregu/null"
"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
type SignersProcessor struct {
signersQ history.QSigners
cache *ingest.ChangeCompactor
batch history.AccountSignersBatchInsertBuilder
// insertOnlyMode is a mode in which we don't use ledger cache and we just
// add signers to a batch, then we Exec all signers in one insert query.
// This is done to make history buckets processing faster (batch inserting).
useLedgerEntryCache bool
}
func NewSignersProcessor(
signersQ history.QSigners, useLedgerEntryCache bool,
) *SignersProcessor {
p := &SignersProcessor{signersQ: signersQ, useLedgerEntryCache: useLedgerEntryCache}
p.reset()
return p
}
func (p *SignersProcessor) reset() {
p.batch = p.signersQ.NewAccountSignersBatchInsertBuilder(maxBatchSize)
p.cache = ingest.NewChangeCompactor()
}
func (p *SignersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeAccount {
return nil
}
if p.useLedgerEntryCache {
err := p.cache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
err = p.Commit(ctx)
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
if !(change.Pre == nil && change.Post != nil) {
return errors.New("AssetStatsProSignersProcessorcessor is in insert only mode")
}
accountEntry := change.Post.Data.MustAccount()
account := accountEntry.AccountId.Address()
sponsors := accountEntry.SponsorPerSigner()
for signer, weight := range accountEntry.SignerSummary() {
var sponsor null.String
if sponsorDesc, isSponsored := sponsors[signer]; isSponsored {
sponsor = null.StringFrom(sponsorDesc.Address())
}
err := p.batch.Add(ctx, history.AccountSigner{
Account: account,
Signer: signer,
Weight: weight,
Sponsor: sponsor,
})
if err != nil {
return errors.Wrap(err, "Error adding row to accountSignerBatch")
}
}
return nil
}
func (p *SignersProcessor) Commit(ctx context.Context) error {
if !p.useLedgerEntryCache {
return p.batch.Exec(ctx)
}
changes := p.cache.GetChanges()
for _, change := range changes {
if !change.AccountSignersChanged() {
continue
}
// The code below removes all Pre signers adds Post signers but
// can be improved by finding a diff (check performance first).
if change.Pre != nil {
preAccountEntry := change.Pre.Data.MustAccount()
for signer := range preAccountEntry.SignerSummary() {
rowsAffected, err := p.signersQ.RemoveAccountSigner(ctx, preAccountEntry.AccountId.Address(), signer)
if err != nil {
return errors.Wrap(err, "Error removing a signer")
}
if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"Expected account=%s signer=%s in database but not found when removing (rows affected = %d)",
preAccountEntry.AccountId.Address(),
signer,
rowsAffected,
))
}
}
}
if change.Post != nil {
postAccountEntry := change.Post.Data.MustAccount()
sponsorsPerSigner := postAccountEntry.SponsorPerSigner()
for signer, weight := range postAccountEntry.SignerSummary() {
// Ignore master key
var sponsor *string
if signer != postAccountEntry.AccountId.Address() {
if s, ok := sponsorsPerSigner[signer]; ok {
a := s.Address()
sponsor = &a
}
}
rowsAffected, err := p.signersQ.CreateAccountSigner(ctx,
postAccountEntry.AccountId.Address(),
signer,
weight,
sponsor,
)
if err != nil {
return errors.Wrapf(err, "Error inserting a signer (%s)", signer)
}
if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when inserting account=%s signer=%s to database",
rowsAffected,
postAccountEntry.AccountId.Address(),
signer,
))
}
}
}
}
return nil
}