Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Unify map-reduce and single-process index builders #4423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 245 additions & 15 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,174 @@ import (
"context"
"fmt"
"io"
"math"
"sync/atomic"
"time"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
"golang.org/x/sync/errgroup"
)

// Module is a way to process data and store it into an index.
func BuildIndices(
ctx context.Context,
sourceUrl string, // where is raw txmeta coming from?
targetUrl string, // where should the resulting indices go?
networkPassphrase string,
startLedger, endLedger uint32,
modules []string,
workerCount int,
) error {
indexStore, err := Connect(targetUrl)
if err != nil {
return err
}

// Simple file os access
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: networkPassphrase,
},
)
if err != nil {
return err
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()

if endLedger == 0 {
latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
return err
}
endLedger = latest
}

ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
parallel := max(1, workerCount)

startTime := time.Now()
log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
startLedger, endLedger, ledgerCount)
log.Infof("Using %d workers", parallel)

// Create a bunch of workers that process ledgers a checkpoint range at a
// time (better than a ledger at a time to minimize flushes).
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan historyarchive.Range, parallel)

indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase)
for _, part := range modules {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worthwhile to use stronger type like enum to represent module types as they increase in number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, because these are supposed to be plug-and-play. Neither their execution order nor enum value would ever matter.

switch part {
case "transactions":
indexBuilder.RegisterModule(ProcessTransaction)
case "accounts":
indexBuilder.RegisterModule(ProcessAccounts)
case "accounts_unbacked":
indexBuilder.RegisterModule(ProcessAccountsWithoutBackend)
default:
return fmt.Errorf("Unknown module: %s", part)
}
}

// Submit the work to the channels, breaking up the range into individual
// checkpoint ranges.
go func() {
// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0
nextCheckpoint := (((startLedger / 64) * 64) + 63)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Ideal use case for CheckpointManager.


ledger := startLedger
nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger))
for ledger <= endLedger {
chunk := historyarchive.Range{Low: ledger, High: nextLedger}
log.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High)
ch <- chunk

ledger = nextLedger + 1
nextLedger = min(endLedger, ledger+63) // don't exceed upper bound
}

close(ch)
}()

processed := uint64(0)
for i := 0; i < parallel; i++ {
wg.Go(func() error {
for ledgerRange := range ch {
count := (ledgerRange.High - ledgerRange.Low) + 1
nprocessed := atomic.AddUint64(&processed, uint64(count))

log.Debugf("Working on checkpoint range [%d, %d]",
ledgerRange.Low, ledgerRange.High)

// Assertion for testing
if ledgerRange.High != endLedger && (ledgerRange.High+1)%64 != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Fatalf("Upper ledger isn't a checkpoint: %v", ledgerRange)
}

err = indexBuilder.Build(ctx, ledgerRange)
if err != nil {
return err
}

printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)

// Upload indices once per checkpoint to save memory
if err := indexStore.Flush(); err != nil {
return errors.Wrap(err, "flushing indices failed")
}
}
return nil
})
}

if err := wg.Wait(); err != nil {
return errors.Wrap(err, "one or more workers failed")
}

printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime)

// Assertion for testing
if processed != uint64(ledgerCount) {
log.Fatalf("processed %d but expected %d", processed, ledgerCount)
}

log.Infof("Processed %d ledgers via %d workers", processed, parallel)
log.Infof("Uploading indices to %s", targetUrl)
if err := indexStore.Flush(); err != nil {
return errors.Wrap(err, "flushing indices failed")
}

return nil
}

// Module is a way to process ingested data and shove it into an index store.
type Module func(
idx Store,
indexStore Store,
ledger xdr.LedgerCloseMeta,
checkpoint uint32,
transaction ingest.LedgerTransaction,
) error

// IndexBuilder contains everything needed to build indices from ledger ranges.
type IndexBuilder struct {
store Store
history ledgerbackend.HistoryArchiveBackend
history *ledgerbackend.HistoryArchiveBackend
networkPassphrase string

modules []Module
}

func NewIndexBuilder(
indexStore Store,
backend ledgerbackend.HistoryArchiveBackend,
backend *ledgerbackend.HistoryArchiveBackend,
networkPassphrase string,
) *IndexBuilder {
return &IndexBuilder{
Expand All @@ -51,18 +190,23 @@ func (builder *IndexBuilder) RegisterModule(module Module) {
// RunModules executes all of the registered modules on the given ledger.
func (builder *IndexBuilder) RunModules(
ledger xdr.LedgerCloseMeta,
checkpoint uint32,
tx ingest.LedgerTransaction,
) error {
for _, module := range builder.modules {
if err := module(builder.store, ledger, checkpoint, tx); err != nil {
if err := module(builder.store, ledger, tx); err != nil {
return err
}
}

return nil
}

// Build sequentially creates indices for each ledger in the given range based
// on the registered modules.
//
// TODO: We can probably optimize this by doing GetLedger in parallel with the
// ingestion & index building, since the network will be idle during the latter
// portion.
func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error {
for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ {
ledger, err := builder.history.GetLedger(ctx, ledgerSeq)
Expand All @@ -71,8 +215,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
return err
}

checkpoint := (ledgerSeq / 64) + 1

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
builder.networkPassphrase, ledger)
if err != nil {
Expand All @@ -87,7 +229,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
return err
}

if err := builder.RunModules(ledger, checkpoint, tx); err != nil {
if err := builder.RunModules(ledger, tx); err != nil {
return err
}
}
Expand All @@ -99,7 +241,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
func ProcessTransaction(
indexStore Store,
ledger xdr.LedgerCloseMeta,
_ uint32,
tx ingest.LedgerTransaction,
) error {
return indexStore.AddTransactionToIndexes(
Expand All @@ -110,10 +251,10 @@ func ProcessTransaction(

func ProcessAccounts(
indexStore Store,
_ xdr.LedgerCloseMeta,
checkpoint uint32,
ledger xdr.LedgerCloseMeta,
tx ingest.LedgerTransaction,
) error {
checkpoint := (ledger.LedgerSequence() / 64) + 1
allParticipants, err := getParticipants(tx)
if err != nil {
return err
Expand Down Expand Up @@ -148,6 +289,48 @@ func ProcessAccounts(

return nil
}

func ProcessAccountsWithoutBackend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want a ProcessTransactionsWithoutBackend as well, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no code doing that, so I didn't add it 🤷‍♂️ This new module was ripped straight from the existing variant, and you can see here that transactions were added with a backend in the original version.

indexStore Store,
ledger xdr.LedgerCloseMeta,
tx ingest.LedgerTransaction,
) error {
checkpoint := (ledger.LedgerSequence() / 64) + 1
allParticipants, err := getParticipants(tx)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants)
if err != nil {
return err
}

paymentsParticipants, err := getPaymentParticipants(tx)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants)
if err != nil {
return err
}

if tx.Result.Successful() {
err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants)
if err != nil {
return err
}
}

return nil
}

func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) {
return participantsForOperations(transaction, true)
}
Expand Down Expand Up @@ -263,13 +446,16 @@ func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayment
// Requires meta
// sponsor, err := operation.getSponsor()
// if err != nil {
// return nil, err
// return nil, err
// }
// if sponsor != nil {
// otherParticipants = append(otherParticipants, *sponsor)
// otherParticipants = append(otherParticipants, *sponsor)
// }
}

// FIXME: This could probably be a set rather than a list, since there's no
// reason to track a participating account more than once if they are
// participants across multiple operations.
return participants, nil
}

Expand All @@ -292,3 +478,47 @@ func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string {
}
return []string{}
}

func printProgress(prefix string, done, total uint64, startTime time.Time) {
// This should never happen, more of a runtime assertion for now.
// We can remove it when production-ready.
if done > total {
panic(fmt.Errorf("error for %s: done > total (%d > %d)",
prefix, done, total))
}

progress := float64(done) / float64(total)
elapsed := time.Since(startTime)

// Approximate based on how many ledgers are left and how long this much
// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this
// assumes consistent ledger load).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could improve the estimation by counting ops instead of ledgers?

remaining := (float64(elapsed) / float64(done)) * float64(total-done)

var remainingStr string
if math.IsInf(remaining, 0) || math.IsNaN(remaining) {
remainingStr = "unknown"
} else {
remainingStr = time.Duration(remaining).Round(time.Millisecond).String()
}

log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix,
100*progress, done, total,
elapsed.Round(time.Millisecond),
remainingStr,
)
}

func min(a, b uint32) uint32 {
if a < b {
return a
}
return b
}

func max(a, b int) int {
if a > b {
return a
}
return b
}
Loading