-
Notifications
You must be signed in to change notification settings - Fork 500
/
offers.go
216 lines (181 loc) · 6.35 KB
/
offers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package history
import (
"context"
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/stellar/go/support/errors"
)
const offersBatchSize = 50000
// QOffers defines offer related queries.
type QOffers interface {
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
UpsertOffers(ctx context.Context, offers []Offer) error
CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error)
}
func (q *Q) CountOffers(ctx context.Context) (int, error) {
sql := sq.Select("count(*)").Where("deleted = ?", false).From("offers")
var count int
if err := q.Get(ctx, &count, sql); err != nil {
return 0, errors.Wrap(err, "could not run select query")
}
return count, nil
}
// GetOfferByID loads a row from the `offers` table, selected by offerid.
func (q *Q) GetOfferByID(ctx context.Context, id int64) (Offer, error) {
var offer Offer
sql := selectOffers.Where("deleted = ?", false).
Where("offers.offer_id = ?", id)
err := q.Get(ctx, &offer, sql)
return offer, err
}
// GetOffersByIDs loads a row from the `offers` table, selected by multiple offerid.
func (q *Q) GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) {
var offers []Offer
sql := selectOffers.Where("deleted = ?", false).
Where(map[string]interface{}{"offers.offer_id": ids})
err := q.Select(ctx, &offers, sql)
return offers, err
}
// GetOffers loads rows from `offers` by paging query.
func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {
sql := selectOffers.Where("deleted = ?", false)
sql, err := query.PageQuery.ApplyTo(sql, "offers.offer_id")
if err != nil {
return nil, errors.Wrap(err, "could not apply query to page")
}
if query.SellerID != "" {
sql = sql.Where("offers.seller_id = ?", query.SellerID)
}
if query.Selling != nil {
sql = sql.Where("offers.selling_asset = ?", query.Selling)
}
if query.Buying != nil {
sql = sql.Where("offers.buying_asset = ?", query.Buying)
}
if query.Sponsor != "" {
sql = sql.Where("offers.sponsor = ?", query.Sponsor)
}
var offers []Offer
if err := q.Select(ctx, &offers, sql); err != nil {
return nil, errors.Wrap(err, "could not run select query")
}
return offers, nil
}
// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
if tx := q.GetTx(); tx == nil {
return errors.New("cannot be called outside of a transaction")
}
if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead {
return errors.New("should only be called in a repeatable read transaction")
}
lastID := int64(0)
for {
nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback)
if err != nil {
return err
}
if lastID == nextID {
return nil
}
lastID = nextID
}
}
func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) {
var rows *sqlx.Rows
var err error
rows, err = q.Query(ctx, selectOffers.
Where("deleted = ?", false).
Where("offer_id > ? ", lastId).
OrderBy("offer_id asc").Limit(limit))
if err != nil {
return 0, errors.Wrap(err, "could not run all offers select query")
}
defer rows.Close()
for rows.Next() {
offer := Offer{}
if err = rows.StructScan(&offer); err != nil {
return 0, errors.Wrap(err, "could not scan row into offer struct")
}
if err = callback(offer); err != nil {
return 0, err
}
lastId = offer.OfferID
}
return lastId, rows.Err()
}
// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error) {
var offers []Offer
err := q.Select(ctx, &offers, selectOffers.Where("offers.last_modified_ledger > ?", newerThanSequence))
return offers, err
}
// UpsertOffers upserts a batch of offers in the offers table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error {
var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD,
price, flags, lastModifiedLedger, deleted, sponsor []interface{}
for _, offer := range offers {
sellerID = append(sellerID, offer.SellerID)
offerID = append(offerID, offer.OfferID)
sellingAsset = append(sellingAsset, offer.SellingAsset)
buyingAsset = append(buyingAsset, offer.BuyingAsset)
amount = append(amount, offer.Amount)
priceN = append(priceN, offer.Pricen)
priceD = append(priceD, offer.Priced)
price = append(price, offer.Price)
flags = append(flags, offer.Flags)
lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger)
deleted = append(deleted, offer.Deleted)
sponsor = append(sponsor, offer.Sponsor)
}
upsertFields := []upsertField{
{"seller_id", "text", sellerID},
{"offer_id", "bigint", offerID},
{"selling_asset", "text", sellingAsset},
{"buying_asset", "text", buyingAsset},
{"amount", "bigint", amount},
{"pricen", "integer", priceN},
{"priced", "integer", priceD},
{"price", "double precision", price},
{"flags", "integer", flags},
{"deleted", "bool", deleted},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"sponsor", "text", sponsor},
}
return q.upsertRows(ctx, "offers", "offer_id", upsertFields)
}
// CompactOffers removes rows from the offers table which are marked for deletion.
func (q *Q) CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error) {
sql := sq.Delete("offers").
Where("deleted = ?", true).
Where("last_modified_ledger <= ?", cutOffSequence)
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, errors.Wrap(err, "cannot delete offer rows")
}
if err = q.UpdateOfferCompactionSequence(ctx, cutOffSequence); err != nil {
return 0, errors.Wrap(err, "cannot update offer compaction sequence")
}
return result.RowsAffected()
}
var selectOffers = sq.Select(`
seller_id,
offer_id,
selling_asset,
buying_asset,
amount,
pricen,
priced,
price,
flags,
deleted,
last_modified_ledger,
sponsor
`).From("offers")