diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go new file mode 100644 index 0000000000..cacdb17580 --- /dev/null +++ b/exp/lighthorizon/index/builder.go @@ -0,0 +1,294 @@ +package index + +import ( + "context" + "fmt" + "io" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/log" + "github.com/stellar/go/toid" + "github.com/stellar/go/xdr" +) + +// Module is a way to process data and store it into an index. +type Module func( + idx Store, + ledger xdr.LedgerCloseMeta, + checkpoint uint32, + transaction ingest.LedgerTransaction, +) error + +// IndexBuilder contains everything needed to build indices from ledger ranges. +type IndexBuilder struct { + store Store + history ledgerbackend.HistoryArchiveBackend + networkPassphrase string + + modules []Module +} + +func NewIndexBuilder( + indexStore Store, + backend ledgerbackend.HistoryArchiveBackend, + networkPassphrase string, +) *IndexBuilder { + return &IndexBuilder{ + store: indexStore, + history: backend, + networkPassphrase: networkPassphrase, + } +} + +// RegisterModule adds a module to process every given ledger. It is not +// threadsafe and all calls should be made *before* any calls to `Build`. +func (builder *IndexBuilder) RegisterModule(module Module) { + builder.modules = append(builder.modules, module) +} + +// RunModules executes all of the registered modules on the given ledger. +func (builder *IndexBuilder) RunModules( + ledger xdr.LedgerCloseMeta, + checkpoint uint32, + tx ingest.LedgerTransaction, +) error { + for _, module := range builder.modules { + if err := module(builder.store, ledger, checkpoint, tx); err != nil { + return err + } + } + + return nil +} + +func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error { + for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ { + ledger, err := builder.history.GetLedger(ctx, ledgerSeq) + if err != nil { + log.WithField("error", err).Errorf("error getting ledger %d", ledgerSeq) + return err + } + + checkpoint := (ledgerSeq / 64) + 1 + + reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( + builder.networkPassphrase, ledger) + if err != nil { + return err + } + + for { + tx, err := reader.Read() + if err == io.EOF { + break + } else if err != nil { + return err + } + + if err := builder.RunModules(ledger, checkpoint, tx); err != nil { + return err + } + } + } + + return nil +} + +func ProcessTransaction( + indexStore Store, + ledger xdr.LedgerCloseMeta, + _ uint32, + tx ingest.LedgerTransaction, +) error { + return indexStore.AddTransactionToIndexes( + toid.New(int32(ledger.LedgerSequence()), int32(tx.Index), 0).ToInt64(), + tx.Result.TransactionHash, + ) +} + +func ProcessAccounts( + indexStore Store, + _ xdr.LedgerCloseMeta, + checkpoint uint32, + tx ingest.LedgerTransaction, +) error { + allParticipants, err := getParticipants(tx) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants) + if err != nil { + return err + } + + paymentsParticipants, err := getPaymentParticipants(tx) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants) + if err != nil { + return err + } + + if tx.Result.Successful() { + err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants) + if err != nil { + return err + } + } + + return nil +} +func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) { + return participantsForOperations(transaction, true) +} + +func getParticipants(transaction ingest.LedgerTransaction) ([]string, error) { + return participantsForOperations(transaction, false) +} + +func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) { + var participants []string + + for opindex, operation := range transaction.Envelope.Operations() { + opSource := operation.SourceAccount + if opSource == nil { + txSource := transaction.Envelope.SourceAccount() + opSource = &txSource + } + + switch operation.Body.Type { + case xdr.OperationTypeCreateAccount, + xdr.OperationTypePayment, + xdr.OperationTypePathPaymentStrictReceive, + xdr.OperationTypePathPaymentStrictSend, + xdr.OperationTypeAccountMerge: + participants = append(participants, opSource.Address()) + default: + if onlyPayments { + continue + } + participants = append(participants, opSource.Address()) + } + + switch operation.Body.Type { + case xdr.OperationTypeCreateAccount: + participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address()) + case xdr.OperationTypePayment: + participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address()) + case xdr.OperationTypePathPaymentStrictReceive: + participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address()) + case xdr.OperationTypePathPaymentStrictSend: + participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address()) + case xdr.OperationTypeManageBuyOffer: + // the only direct participant is the source_account + case xdr.OperationTypeManageSellOffer: + // the only direct participant is the source_account + case xdr.OperationTypeCreatePassiveSellOffer: + // the only direct participant is the source_account + case xdr.OperationTypeSetOptions: + // the only direct participant is the source_account + case xdr.OperationTypeChangeTrust: + // the only direct participant is the source_account + case xdr.OperationTypeAllowTrust: + participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address()) + case xdr.OperationTypeAccountMerge: + participants = append(participants, operation.Body.MustDestination().ToAccountId().Address()) + case xdr.OperationTypeInflation: + // the only direct participant is the source_account + case xdr.OperationTypeManageData: + // the only direct participant is the source_account + case xdr.OperationTypeBumpSequence: + // the only direct participant is the source_account + case xdr.OperationTypeCreateClaimableBalance: + for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants { + participants = append(participants, c.MustV0().Destination.Address()) + } + case xdr.OperationTypeClaimClaimableBalance: + // the only direct participant is the source_account + case xdr.OperationTypeBeginSponsoringFutureReserves: + participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address()) + case xdr.OperationTypeEndSponsoringFutureReserves: + // Failed transactions may not have a compliant sandwich structure + // we can rely on (e.g. invalid nesting or a being operation with the wrong sponsoree ID) + // and thus we bail out since we could return incorrect information. + if transaction.Result.Successful() { + sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address() + if operation.SourceAccount != nil { + sponsoree = operation.SourceAccount.Address() + } + operations := transaction.Envelope.Operations() + for i := int(opindex) - 1; i >= 0; i-- { + if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok && + beginOp.SponsoredId.Address() == sponsoree { + participants = append(participants, beginOp.SponsoredId.Address()) + } + } + } + case xdr.OperationTypeRevokeSponsorship: + op := operation.Body.MustRevokeSponsorshipOp() + switch op.Type { + case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry: + participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...) + case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner: + participants = append(participants, op.Signer.AccountId.Address()) + // We don't add signer as a participant because a signer can be arbitrary account. + // This can spam successful operations history of any account. + } + case xdr.OperationTypeClawback: + op := operation.Body.MustClawbackOp() + participants = append(participants, op.From.ToAccountId().Address()) + case xdr.OperationTypeClawbackClaimableBalance: + // the only direct participant is the source_account + case xdr.OperationTypeSetTrustLineFlags: + op := operation.Body.MustSetTrustLineFlagsOp() + participants = append(participants, op.Trustor.Address()) + case xdr.OperationTypeLiquidityPoolDeposit: + // the only direct participant is the source_account + case xdr.OperationTypeLiquidityPoolWithdraw: + // the only direct participant is the source_account + default: + return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type) + } + + // Requires meta + // sponsor, err := operation.getSponsor() + // if err != nil { + // return nil, err + // } + // if sponsor != nil { + // otherParticipants = append(otherParticipants, *sponsor) + // } + } + + return participants, nil +} + +// getLedgerKeyParticipants returns a list of accounts that are considered +// "participants" in a particular ledger entry. +// +// This list will have zero or one element, making it easy to expand via `...`. +func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string { + switch ledgerKey.Type { + case xdr.LedgerEntryTypeAccount: + return []string{ledgerKey.Account.AccountId.Address()} + case xdr.LedgerEntryTypeData: + return []string{ledgerKey.Data.AccountId.Address()} + case xdr.LedgerEntryTypeOffer: + return []string{ledgerKey.Offer.SellerId.Address()} + case xdr.LedgerEntryTypeTrustline: + return []string{ledgerKey.TrustLine.AccountId.Address()} + case xdr.LedgerEntryTypeClaimableBalance: + // nothing to do + } + return []string{} +} diff --git a/exp/lighthorizon/index/cmd/single/main.go b/exp/lighthorizon/index/cmd/single/main.go index 46f2f07562..8d95c35315 100644 --- a/exp/lighthorizon/index/cmd/single/main.go +++ b/exp/lighthorizon/index/cmd/single/main.go @@ -4,36 +4,33 @@ import ( "context" "flag" "fmt" - "io" + "math" + "runtime" "strings" "sync/atomic" "time" "github.com/stellar/go/exp/lighthorizon/index" "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/network" "github.com/stellar/go/support/log" - "github.com/stellar/go/toid" - "github.com/stellar/go/xdr" "golang.org/x/sync/errgroup" ) -var ( - // Should we use runtime.NumCPU() for a reasonable default? - parallel = uint32(20) -) - func main() { sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files") targetUrl := flag.String("target", "file://indexes", "where to write indexes") networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase") - start := flag.Int("start", -1, "ledger to start at (default: earliest)") - end := flag.Int("end", -1, "ledger to end at (default: latest)") + start := flag.Int("start", -1, "ledger to start at (inclusive, default: earliest)") + end := flag.Int("end", -1, "ledger to end at (inclusive, default: latest)") modules := flag.String("modules", "accounts,transactions", "comma-separated list of modules to index (default: all)") - flag.Parse() + // Should we use runtime.NumCPU() for a reasonable default? + // Yes, but leave a CPU open so I can actually use my PC while this runs. + workerCount := flag.Int("workers", runtime.NumCPU()-1, "number of workers (default: # of CPUs - 1)") + + flag.Parse() log.SetLevel(log.InfoLevel) ctx := context.Background() @@ -59,123 +56,81 @@ func main() { startTime := time.Now() - if *start < 2 { - *start = 2 - } - if *end == -1 { + startLedger := uint32(max(*start, 2)) + endLedger := uint32(*end) + if endLedger < 0 { latest, err := ledgerBackend.GetLatestLedgerSequence(ctx) if err != nil { panic(err) } - *end = int(latest) + endLedger = latest } - startLedger := uint32(*start) //uint32((39680056) / 64) - endLedger := uint32(*end) - all := endLedger - startLedger + ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive + parallel := max(1, *workerCount) - wg, ctx := errgroup.WithContext(ctx) + log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)", + startLedger, endLedger, ledgerCount) + log.Infof("Using %d workers", parallel) - ch := make(chan uint32, parallel) + // Create a bunch of workers that process ledgers a checkpoint range at a + // time (better than a ledger at a time to minimize flushes). + wg, ctx := errgroup.WithContext(ctx) + ch := make(chan historyarchive.Range, parallel) + + indexBuilder := index.NewIndexBuilder(indexStore, *ledgerBackend, *networkPassphrase) + for _, part := range strings.Split(*modules, ",") { + switch part { + case "transactions": + indexBuilder.RegisterModule(index.ProcessTransaction) + case "accounts": + indexBuilder.RegisterModule(index.ProcessAccounts) + default: + panic(fmt.Errorf("Unknown module: %s", part)) + } + } + // Submit the work to the channels, breaking up the range into checkpoints. go func() { - for i := startLedger; i <= endLedger; i++ { - ch <- i + // Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0 + nextCheckpoint := (((startLedger / 64) * 64) + 63) + + ledger := startLedger + nextLedger := ledger + (nextCheckpoint - startLedger) + for ledger <= endLedger { + ch <- historyarchive.Range{Low: ledger, High: nextLedger} + + ledger = nextLedger + 1 + // Ensure we don't exceed the upper ledger bound + nextLedger = uint32(min(int(endLedger), int(ledger+63))) } + close(ch) }() processed := uint64(0) - for i := uint32(0); i < parallel; i++ { + for i := 0; i < parallel; i++ { wg.Go(func() error { - for ledgerSeq := range ch { - fmt.Println("Processing ledger", ledgerSeq) - ledger, err := ledgerBackend.GetLedger(ctx, ledgerSeq) - if err != nil { - log.WithField("error", err).Error("error getting ledgers") - ch <- ledgerSeq - continue - } + for ledgerRange := range ch { + count := (ledgerRange.High - ledgerRange.Low) + 1 + nprocessed := atomic.AddUint64(&processed, uint64(count)) + + log.Debugf("Working on checkpoint range %+v", ledgerRange) - checkpoint := ledgerSeq / 64 + // Assertion for testing + if ledgerRange.High != endLedger && + (ledgerRange.High+1)%64 != 0 { + log.Fatalf("Uh oh: bad range") + } - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(*networkPassphrase, ledger) + err = indexBuilder.Build(ctx, ledgerRange) if err != nil { return err } - for { - tx, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } - return err - } - - if strings.Contains(*modules, "transactions") { - indexStore.AddTransactionToIndexes( - toid.New(int32(ledger.LedgerSequence()), int32(tx.Index), 0).ToInt64(), - tx.Result.TransactionHash, - ) - } - - if strings.Contains(*modules, "accounts") { - allParticipants, err := participantsForOperations(tx, false) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants) - if err != nil { - return err - } - - paymentsParticipants, err := participantsForOperations(tx, true) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants) - if err != nil { - return err - } - - if tx.Result.Successful() { - allParticipants, err := participantsForOperations(tx, false) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants) - if err != nil { - return err - } - - paymentsParticipants, err := participantsForOperations(tx, true) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants) - if err != nil { - return err - } - } - } - } - } + printProgress("Reading ledgers", + nprocessed, uint64(ledgerCount), startTime) - nprocessed := atomic.AddUint64(&processed, 1) - - if nprocessed%100 == 0 { - log.Infof( - "Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s", - (float64(nprocessed)/float64(all))*100, - time.Since(startTime).Round(1*time.Second), - (time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second), - ) - - // Clear indexes to save memory + // Upload indices once per checkpoint to save memory if err := indexStore.Flush(); err != nil { return err } @@ -187,142 +142,62 @@ func main() { if err := wg.Wait(); err != nil { panic(err) } - log.Infof("Uploading indexes") + + printProgress("Reading ledgers", + uint64(ledgerCount), uint64(ledgerCount), startTime) + + // Assertion for testing + if processed != uint64(ledgerCount) { + log.Fatalf("processed %d but expected %d", processed, ledgerCount) + } + + log.Infof("Processed %d ledgers via %d workers", processed, parallel) + log.Infof("Uploading indices to %s", *targetUrl) if err := indexStore.Flush(); err != nil { panic(err) } } -func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) { - var participants []string +func printProgress(prefix string, done, total uint64, startTime time.Time) { + // This should never happen, more of a runtime assertion for now. + // We can remove it when production-ready. + if done > total { + panic(fmt.Errorf("error for %s: done > total (%d > %d)", + prefix, done, total)) + } - for opindex, operation := range transaction.Envelope.Operations() { - opSource := operation.SourceAccount - if opSource == nil { - txSource := transaction.Envelope.SourceAccount() - opSource = &txSource - } + progress := float64(done) / float64(total) + elapsed := time.Since(startTime) - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount, - xdr.OperationTypePayment, - xdr.OperationTypePathPaymentStrictReceive, - xdr.OperationTypePathPaymentStrictSend, - xdr.OperationTypeAccountMerge: - participants = append(participants, opSource.Address()) - default: - if onlyPayments { - continue - } - participants = append(participants, opSource.Address()) - } + // Approximate based on how many ledgers are left and how long this much + // progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this + // assumes consistent ledger load). + remaining := (float64(elapsed) / float64(done)) * float64(total-done) - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount: - participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address()) - case xdr.OperationTypePayment: - participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictReceive: - participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictSend: - participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address()) - case xdr.OperationTypeManageBuyOffer: - // the only direct participant is the source_account - case xdr.OperationTypeManageSellOffer: - // the only direct participant is the source_account - case xdr.OperationTypeCreatePassiveSellOffer: - // the only direct participant is the source_account - case xdr.OperationTypeSetOptions: - // the only direct participant is the source_account - case xdr.OperationTypeChangeTrust: - // the only direct participant is the source_account - case xdr.OperationTypeAllowTrust: - participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address()) - case xdr.OperationTypeAccountMerge: - participants = append(participants, operation.Body.MustDestination().ToAccountId().Address()) - case xdr.OperationTypeInflation: - // the only direct participant is the source_account - case xdr.OperationTypeManageData: - // the only direct participant is the source_account - case xdr.OperationTypeBumpSequence: - // the only direct participant is the source_account - case xdr.OperationTypeCreateClaimableBalance: - for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants { - participants = append(participants, c.MustV0().Destination.Address()) - } - case xdr.OperationTypeClaimClaimableBalance: - // the only direct participant is the source_account - case xdr.OperationTypeBeginSponsoringFutureReserves: - participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address()) - case xdr.OperationTypeEndSponsoringFutureReserves: - // Failed transactions may not have a compliant sandwich structure - // we can rely on (e.g. invalid nesting or a being operation with the wrong sponsoree ID) - // and thus we bail out since we could return incorrect information. - if transaction.Result.Successful() { - sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address() - if operation.SourceAccount != nil { - sponsoree = operation.SourceAccount.Address() - } - operations := transaction.Envelope.Operations() - for i := int(opindex) - 1; i >= 0; i-- { - if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok && - beginOp.SponsoredId.Address() == sponsoree { - participants = append(participants, beginOp.SponsoredId.Address()) - } - } - } - case xdr.OperationTypeRevokeSponsorship: - op := operation.Body.MustRevokeSponsorshipOp() - switch op.Type { - case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry: - participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...) - case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner: - participants = append(participants, op.Signer.AccountId.Address()) - // We don't add signer as a participant because a signer can be arbitrary account. - // This can spam successful operations history of any account. - } - case xdr.OperationTypeClawback: - op := operation.Body.MustClawbackOp() - participants = append(participants, op.From.ToAccountId().Address()) - case xdr.OperationTypeClawbackClaimableBalance: - // the only direct participant is the source_account - case xdr.OperationTypeSetTrustLineFlags: - op := operation.Body.MustSetTrustLineFlagsOp() - participants = append(participants, op.Trustor.Address()) - case xdr.OperationTypeLiquidityPoolDeposit: - // the only direct participant is the source_account - case xdr.OperationTypeLiquidityPoolWithdraw: - // the only direct participant is the source_account - default: - return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type) - } - - // Requires meta - // sponsor, err := operation.getSponsor() - // if err != nil { - // return nil, err - // } - // if sponsor != nil { - // otherParticipants = append(otherParticipants, *sponsor) - // } + var remainingStr string + if math.IsInf(remaining, 0) || math.IsNaN(remaining) { + remainingStr = "unknown" + } else { + remainingStr = time.Duration(remaining).Round(time.Millisecond).String() } - return participants, nil + log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix, + 100*progress, done, total, + elapsed.Round(time.Millisecond), + remainingStr, + ) +} + +func max(a, b int) int { + if a > b { + return a + } + return b } -func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string { - var result []string - switch ledgerKey.Type { - case xdr.LedgerEntryTypeAccount: - result = append(result, ledgerKey.Account.AccountId.Address()) - case xdr.LedgerEntryTypeClaimableBalance: - // nothing to do - case xdr.LedgerEntryTypeData: - result = append(result, ledgerKey.Data.AccountId.Address()) - case xdr.LedgerEntryTypeOffer: - result = append(result, ledgerKey.Offer.SellerId.Address()) - case xdr.LedgerEntryTypeTrustline: - result = append(result, ledgerKey.TrustLine.AccountId.Address()) +func min(a, b int) int { + if a < b { + return a } - return result + return b } diff --git a/toid/main.go b/toid/main.go index 0b8a477569..19d017b7f1 100644 --- a/toid/main.go +++ b/toid/main.go @@ -127,6 +127,9 @@ func (id *ID) IncOperationOrder() { } // New creates a new total order ID +// +// FIXME: I feel like since ledger sequences are uint32s, TOIDs should +// take that into account for the ledger parameter... func New(ledger int32, tx int32, op int32) *ID { return &ID{ LedgerSequence: ledger,