Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/expingest: Remove orderbook graph from ingestion system #2639

Merged
merged 5 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (a *App) Serve() {
}

go a.run()
go a.orderBookStream.Run(a.ctx)

// WaitGroup for all go routines. Makes sure that DB is closed when
// all services gracefully shutdown.
Expand Down Expand Up @@ -414,13 +415,7 @@ func (a *App) Tick() {
var wg sync.WaitGroup
log.Debug("ticking app")
// update ledger state, operation fee state, and stellar-core info in parallel
wg.Add(4)
go func() {
defer wg.Done()
if err := a.orderBookStream.Update(); err != nil {
log.WithField("error", err).Error("could not apply updates from order book stream")
}
}()
wg.Add(3)
go func() { a.UpdateLedgerState(); wg.Done() }()
go func() { a.UpdateFeeStatsState(); wg.Done() }()
go func() { a.UpdateStellarCoreInfo(); wg.Done() }()
Expand Down
26 changes: 24 additions & 2 deletions services/horizon/internal/expingest/orderbook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expingest

import (
"context"
"database/sql"
"math/rand"
"sort"
Expand All @@ -12,7 +13,10 @@ import (
"github.com/stellar/go/xdr"
)

const verificationFrequency = time.Hour
const (
verificationFrequency = time.Hour
updateFrequency = 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change it to 5 sec. so it updates with every ledger (on average)?

Suggested change
updateFrequency = 30 * time.Second
updateFrequency = 5 * time.Second

)

// OrderBookStream updates an in memory graph to be consistent with
// offers in the Horizon DB. Any offers which are created, modified, or removed
Expand Down Expand Up @@ -160,7 +164,7 @@ func (o *OrderBookStream) verifyAllOffers() {
ingestionOffers, err := o.HistoryQ.GetAllOffers()
if err != nil {
// reset last update so that we retry verification on next update
o.lastUpdate = time.Now().Add(time.Hour * -2)
o.lastUpdate = time.Now().Add(verificationFrequency * -2)
log.WithError(err).Info("Could not verify offers because of error from GetAllOffers")
return
}
Expand Down Expand Up @@ -231,3 +235,21 @@ func (o *OrderBookStream) Update() error {
}
return nil
}

// Run will call Update() every 30 seconds until the given context is terminated.
func (o *OrderBookStream) Run(ctx context.Context) {
ticker := time.NewTicker(updateFrequency)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := o.Update(); err != nil {
log.WithError(err).Error("could not apply updates from order book stream")
}
case <-ctx.Done():
log.Info("finished background ticker")
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}