diff --git a/exp/lighthorizon/adapters/transaction.go b/exp/lighthorizon/adapters/transaction.go index 781e71bd99..6942668c8d 100644 --- a/exp/lighthorizon/adapters/transaction.go +++ b/exp/lighthorizon/adapters/transaction.go @@ -11,8 +11,8 @@ import ( "time" "unicode/utf8" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/common" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/network" protocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/support/render/hal" @@ -211,8 +211,8 @@ func signatures(xdrSignatures []xdr.DecoratedSignature) []string { return signatures } -func memoType(transaction archive.LedgerTransaction) string { - switch transaction.Envelope.Memo().Type { +func memoType(tx ingester.LedgerTransaction) string { + switch tx.Envelope.Memo().Type { case xdr.MemoTypeMemoNone: return "none" case xdr.MemoTypeMemoText: @@ -224,13 +224,13 @@ func memoType(transaction archive.LedgerTransaction) string { case xdr.MemoTypeMemoReturn: return "return" default: - panic(fmt.Errorf("invalid memo type: %v", transaction.Envelope.Memo().Type)) + panic(fmt.Errorf("invalid memo type: %v", tx.Envelope.Memo().Type)) } } -func memo(transaction archive.LedgerTransaction) (value string, valid bool) { +func memo(tx ingester.LedgerTransaction) (value string, valid bool) { valid = true - memo := transaction.Envelope.Memo() + memo := tx.Envelope.Memo() switch memo.Type { case xdr.MemoTypeMemoNone: diff --git a/exp/lighthorizon/adapters/transaction_test.go b/exp/lighthorizon/adapters/transaction_test.go index 1eac3042f7..5a8ba4ab80 100644 --- a/exp/lighthorizon/adapters/transaction_test.go +++ b/exp/lighthorizon/adapters/transaction_test.go @@ -11,8 +11,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/common" + "github.com/stellar/go/exp/lighthorizon/ingester" + "github.com/stellar/go/ingest" "github.com/stellar/go/network" protocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/toid" @@ -52,15 +53,17 @@ func TestTransactionAdapter(t *testing.T) { closeTimestamp := expectedTx.LedgerCloseTime.UTC().Unix() tx := common.Transaction{ - LedgerTransaction: &archive.LedgerTransaction{ - Index: 0, - Envelope: txEnv, - Result: xdr.TransactionResultPair{ - TransactionHash: xdr.Hash{}, - Result: txResult, + LedgerTransaction: &ingester.LedgerTransaction{ + LedgerTransaction: &ingest.LedgerTransaction{ + Index: 0, + Envelope: txEnv, + Result: xdr.TransactionResultPair{ + TransactionHash: xdr.Hash{}, + Result: txResult, + }, + FeeChanges: txFeeMeta, + UnsafeMeta: txMeta, }, - FeeChanges: txFeeMeta, - UnsafeMeta: txMeta, }, LedgerHeader: &xdr.LedgerHeader{ LedgerSeq: xdr.Uint32(expectedTx.Ledger), diff --git a/exp/lighthorizon/archive/ingest_archive.go b/exp/lighthorizon/archive/ingest_archive.go deleted file mode 100644 index b5de038686..0000000000 --- a/exp/lighthorizon/archive/ingest_archive.go +++ /dev/null @@ -1,151 +0,0 @@ -package archive - -import ( - "context" - "fmt" - "net/url" - - "github.com/stellar/go/exp/lighthorizon/index" - "github.com/stellar/go/ingest" - "github.com/stellar/go/ingest/ledgerbackend" - "github.com/stellar/go/metaarchive" - "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/xdr" -) - -type ArchiveConfig struct { - SourceUrl string - NetworkPassphrase string - CacheDir string - CacheSize int -} - -func NewIngestArchive(config ArchiveConfig) (Archive, error) { - if config.CacheSize <= 0 { - return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) - } - - parsed, err := url.Parse(config.SourceUrl) - if err != nil { - return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) - } - - region := "" - needsCache := true - switch parsed.Scheme { - case "file": - // We should only avoid a cache if the ledgers are already local. - needsCache = false - - case "s3": - // We need to extract the region if it's specified. - region = parsed.Query().Get("region") - } - - // Now, set up a simple filesystem-like access to the backend and wrap it in - // a local on-disk LRU cache if we can. - source, err := historyarchive.ConnectBackend( - config.SourceUrl, - storage.ConnectOptions{ - Context: context.Background(), - S3Region: region, - }, - ) - if err != nil { - return nil, err - } - - if needsCache { - cache, err := storage.MakeOnDiskCache(source, - config.CacheDir, uint(config.CacheSize)) - - if err != nil { // warn but continue w/o cache - log.WithField("path", config.CacheDir). - WithError(err). - Warnf("Failed to create cached ledger backend") - } else { - log.WithField("path", config.CacheDir). - Infof("On-disk cache configured") - source = cache - } - } - - metaArchive := metaarchive.NewMetaArchive(source) - - ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive) - return ingestArchive{ledgerBackend}, nil -} - -// This is an implementation of LightHorizon Archive that uses the existing horizon ingestion backend. -type ingestArchive struct { - *ledgerbackend.HistoryArchiveBackend -} - -func (a ingestArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { - ingestReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) - - if err != nil { - return nil, err - } - - return &ingestTransactionReaderAdaption{ingestReader}, nil -} - -func (a ingestArchive) GetTransactionParticipants(tx LedgerTransaction) (set.Set[string], error) { - participants, err := index.GetTransactionParticipants(a.ingestTx(tx)) - if err != nil { - return nil, err - } - - s := set.NewSet[string](len(participants)) - s.AddSlice(participants) - return s, nil -} - -func (a ingestArchive) GetOperationParticipants(tx LedgerTransaction, op xdr.Operation, opIndex int) (set.Set[string], error) { - participants, err := index.GetOperationParticipants(a.ingestTx(tx), op, opIndex) - if err != nil { - return nil, err - } - - s := set.NewSet[string](len(participants)) - s.AddSlice(participants) - return s, nil -} - -func (ingestArchive) ingestTx(transaction LedgerTransaction) ingest.LedgerTransaction { - tx := ingest.LedgerTransaction{} - tx.Index = transaction.Index - tx.Envelope = transaction.Envelope - tx.Result = transaction.Result - tx.FeeChanges = transaction.FeeChanges - tx.UnsafeMeta = transaction.UnsafeMeta - return tx -} - -type ingestTransactionReaderAdaption struct { - *ingest.LedgerTransactionReader -} - -func (adaptation *ingestTransactionReaderAdaption) Read() (LedgerTransaction, error) { - tx := LedgerTransaction{} - ingestLedgerTransaction, err := adaptation.LedgerTransactionReader.Read() - if err != nil { - return tx, err - } - - tx.Index = ingestLedgerTransaction.Index - tx.Envelope = ingestLedgerTransaction.Envelope - tx.Result = ingestLedgerTransaction.Result - tx.FeeChanges = ingestLedgerTransaction.FeeChanges - tx.UnsafeMeta = ingestLedgerTransaction.UnsafeMeta - - return tx, nil -} - -var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface diff --git a/exp/lighthorizon/archive/main.go b/exp/lighthorizon/archive/main.go deleted file mode 100644 index 882cd3397b..0000000000 --- a/exp/lighthorizon/archive/main.go +++ /dev/null @@ -1,59 +0,0 @@ -package archive - -import ( - "context" - - "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/xdr" -) - -// checkpointsToLookup defines a number of checkpoints to check when filling -// a list of objects up to a requested limit. In the old ledgers in pubnet -// many ledgers or even checkpoints were empty. This means that when building -// a list of 200 operations ex. starting at first ledger, lighthorizon will -// have to download many ledgers until it's able to fill the list completely. -// This can be solved by keeping an index/list of empty ledgers. -// TODO: make this configurable. -// -//lint:ignore U1000 Ignore unused temporarily -const checkpointsToLookup = 1 - -// LightHorizon data model -type LedgerTransaction struct { - Index uint32 - Envelope xdr.TransactionEnvelope - Result xdr.TransactionResultPair - FeeChanges xdr.LedgerEntryChanges - UnsafeMeta xdr.TransactionMeta -} - -type LedgerTransactionReader interface { - Read() (LedgerTransaction, error) -} - -// Archive here only has the methods LightHorizon cares about, to make caching/wrapping easier -type Archive interface { - - // GetLedger - takes a caller context and a sequence number and returns the meta data - // for the ledger corresponding to the sequence number. If there is any error, it will - // return nil and the error. - GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) - - // Close - will release any resources used for this archive instance and should be - // called at end of usage of archive. - Close() error - - // NewLedgerTransactionReaderFromLedgerCloseMeta - takes the passphrase for the blockchain network - // and the LedgerCloseMeta(meta data) and returns a reader that can be used to obtain a LedgerTransaction model - // from the meta data. If there is any error, it will return nil and the error. - NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) - - // GetTransactionParticipants - takes a LedgerTransaction and returns a set of all - // participants(accounts) in the transaction. If there is any error, it will return nil and the error. - GetTransactionParticipants(tx LedgerTransaction) (set.Set[string], error) - - // GetOperationParticipants - takes a LedgerTransaction, the Operation within the transaction, and - // the 0 based index of the operation within the transaction. It will return a set of all participants(accounts) - // in the operation. If there is any error, it will return nil and the error. - GetOperationParticipants(tx LedgerTransaction, op xdr.Operation, opIndex int) (set.Set[string], error) -} diff --git a/exp/lighthorizon/archive/mock_archive.go b/exp/lighthorizon/archive/mock_archive.go deleted file mode 100644 index 29c928314f..0000000000 --- a/exp/lighthorizon/archive/mock_archive.go +++ /dev/null @@ -1,47 +0,0 @@ -package archive - -import ( - "context" - - "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" -) - -type MockLedgerTransactionReader struct { - mock.Mock -} - -func (m *MockLedgerTransactionReader) Read() (LedgerTransaction, error) { - args := m.Called() - return args.Get(0).(LedgerTransaction), args.Error(1) -} - -type MockArchive struct { - mock.Mock -} - -func (m *MockArchive) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { - args := m.Called(ctx, sequence) - return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) -} - -func (m *MockArchive) Close() error { - args := m.Called() - return args.Error(0) -} - -func (m *MockArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { - args := m.Called(networkPassphrase, ledgerCloseMeta) - return args.Get(0).(LedgerTransactionReader), args.Error(1) -} - -func (m *MockArchive) GetTransactionParticipants(tx LedgerTransaction) (set.Set[string], error) { - args := m.Called(tx) - return args.Get(0).(set.Set[string]), args.Error(1) -} - -func (m *MockArchive) GetOperationParticipants(tx LedgerTransaction, op xdr.Operation, opIndex int) (set.Set[string], error) { - args := m.Called(tx, op, opIndex) - return args.Get(0).(set.Set[string]), args.Error(1) -} diff --git a/exp/lighthorizon/common/transaction.go b/exp/lighthorizon/common/transaction.go index 4a8b2cfe09..104fd3bc6b 100644 --- a/exp/lighthorizon/common/transaction.go +++ b/exp/lighthorizon/common/transaction.go @@ -4,14 +4,14 @@ import ( "encoding/hex" "errors" - "github.com/stellar/go/exp/lighthorizon/archive" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/network" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) type Transaction struct { - *archive.LedgerTransaction + *ingester.LedgerTransaction LedgerHeader *xdr.LedgerHeader TxIndex int32 diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index b3ce5509f5..6e7b4d6811 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -84,7 +84,7 @@ func BuildIndices( wg, ctx := errgroup.WithContext(ctx) ch := make(chan historyarchive.Range, parallel) - indexBuilder := NewIndexBuilder(indexStore, *metaArchive, networkPassphrase) + indexBuilder := NewIndexBuilder(indexStore, metaArchive, networkPassphrase) for _, part := range modules { switch part { case "transactions": diff --git a/exp/lighthorizon/ingester/ingester.go b/exp/lighthorizon/ingester/ingester.go new file mode 100644 index 0000000000..21bb400b50 --- /dev/null +++ b/exp/lighthorizon/ingester/ingester.go @@ -0,0 +1,55 @@ +package ingester + +import ( + "context" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/metaarchive" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/xdr" +) + +type IngesterConfig struct { + SourceUrl string + NetworkPassphrase string + + CacheDir string + CacheSize int + + ParallelDownloads uint +} + +type liteIngester struct { + metaarchive.MetaArchive + networkPassphrase string +} + +func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + return nil +} + +func (i *liteIngester) NewLedgerTransactionReader( + ledgerCloseMeta xdr.SerializedLedgerCloseMeta, +) (LedgerTransactionReader, error) { + reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( + i.networkPassphrase, + ledgerCloseMeta.MustV0()) + + return &liteLedgerTransactionReader{reader}, err +} + +type liteLedgerTransactionReader struct { + *ingest.LedgerTransactionReader +} + +func (reader *liteLedgerTransactionReader) Read() (LedgerTransaction, error) { + ingestedTx, err := reader.LedgerTransactionReader.Read() + if err != nil { + return LedgerTransaction{}, err + } + return LedgerTransaction{LedgerTransaction: &ingestedTx}, nil +} + +var _ Ingester = (*liteIngester)(nil) // ensure conformity to the interface +var _ LedgerTransactionReader = (*liteLedgerTransactionReader)(nil) diff --git a/exp/lighthorizon/ingester/main.go b/exp/lighthorizon/ingester/main.go new file mode 100644 index 0000000000..a93636c67a --- /dev/null +++ b/exp/lighthorizon/ingester/main.go @@ -0,0 +1,87 @@ +package ingester + +import ( + "context" + "fmt" + "net/url" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" + "github.com/stellar/go/xdr" +) + +// +// LightHorizon data model +// + +// Ingester combines a source of unpacked ledger metadata and a way to create a +// ingestion reader interface on top of it. +type Ingester interface { + metaarchive.MetaArchive + + PrepareRange(ctx context.Context, r historyarchive.Range) error + NewLedgerTransactionReader( + ledgerCloseMeta xdr.SerializedLedgerCloseMeta, + ) (LedgerTransactionReader, error) +} + +// For now, this mirrors the `ingest` library exactly, but it's replicated so +// that we can diverge in the future if necessary. +type LedgerTransaction struct { + *ingest.LedgerTransaction +} + +type LedgerTransactionReader interface { + Read() (LedgerTransaction, error) +} + +func NewIngester(config IngesterConfig) (Ingester, error) { + if config.CacheSize <= 0 { + return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) + } + + // Now, set up a simple filesystem-like access to the backend and wrap it in + // a local on-disk LRU cache if we can. + source, err := historyarchive.ConnectBackend( + config.SourceUrl, + storage.ConnectOptions{Context: context.Background()}, + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) + } + + parsed, err := url.Parse(config.SourceUrl) + if err != nil { + return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) + } + + if parsed.Scheme != "file" { // otherwise, already on-disk + cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) + + if errr != nil { // non-fatal: warn but continue w/o cache + log.WithField("path", config.CacheDir).WithError(errr). + Warnf("Failed to create cached ledger backend") + } else { + log.WithField("path", config.CacheDir). + Infof("On-disk cache configured") + source = cache + } + } + + if config.ParallelDownloads > 1 { + log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads) + return NewParallelIngester( + metaarchive.NewMetaArchive(source), + config.NetworkPassphrase, + config.ParallelDownloads), nil + } + + return &liteIngester{ + MetaArchive: metaarchive.NewMetaArchive(source), + networkPassphrase: config.NetworkPassphrase, + }, nil +} diff --git a/exp/lighthorizon/ingester/mock_ingester.go b/exp/lighthorizon/ingester/mock_ingester.go new file mode 100644 index 0000000000..62c377ce78 --- /dev/null +++ b/exp/lighthorizon/ingester/mock_ingester.go @@ -0,0 +1,44 @@ +package ingester + +import ( + "context" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/mock" +) + +type MockIngester struct { + mock.Mock +} + +func (m *MockIngester) NewLedgerTransactionReader( + ledgerCloseMeta xdr.SerializedLedgerCloseMeta, +) (LedgerTransactionReader, error) { + args := m.Called(ledgerCloseMeta) + return args.Get(0).(LedgerTransactionReader), args.Error(1) +} + +func (m *MockIngester) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + args := m.Called(ctx) + return args.Get(0).(uint32), args.Error(1) +} + +func (m *MockIngester) GetLedger(ctx context.Context, sequence uint32) (xdr.SerializedLedgerCloseMeta, error) { + args := m.Called(ctx, sequence) + return args.Get(0).(xdr.SerializedLedgerCloseMeta), args.Error(1) +} + +func (m *MockIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + args := m.Called(ctx, r) + return args.Error(0) +} + +type MockLedgerTransactionReader struct { + mock.Mock +} + +func (m *MockLedgerTransactionReader) Read() (LedgerTransaction, error) { + args := m.Called() + return args.Get(0).(LedgerTransaction), args.Error(1) +} diff --git a/exp/lighthorizon/ingester/parallel_ingester.go b/exp/lighthorizon/ingester/parallel_ingester.go new file mode 100644 index 0000000000..133b0a37c4 --- /dev/null +++ b/exp/lighthorizon/ingester/parallel_ingester.go @@ -0,0 +1,141 @@ +package ingester + +import ( + "context" + "sync" + "time" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +type parallelIngester struct { + liteIngester + + ledgerFeed sync.Map // thread-safe version of map[uint32]downloadState + ledgerQueue set.ISet[uint32] + + workQueue chan uint32 + signalChan chan error +} + +type downloadState struct { + ledger xdr.SerializedLedgerCloseMeta + err error +} + +// NewParallelIngester creates an ingester on the given `ledgerSource` using the +// given `networkPassphrase` that can download ledgers in parallel via +// `workerCount` workers via `PrepareRange()`. +func NewParallelIngester( + archive metaarchive.MetaArchive, + networkPassphrase string, + workerCount uint, +) *parallelIngester { + self := ¶llelIngester{ + liteIngester: liteIngester{ + MetaArchive: archive, + networkPassphrase: networkPassphrase, + }, + ledgerFeed: sync.Map{}, + ledgerQueue: set.NewSafeSet[uint32](64), + workQueue: make(chan uint32, workerCount), + signalChan: make(chan error), + } + + // These are the workers that download & store ledgers in memory. + for j := uint(0); j < workerCount; j++ { + go func(jj uint) { + for ledgerSeq := range self.workQueue { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + txmeta, err := self.liteIngester.GetLedger(ctx, ledgerSeq) + cancel() + + log.WithField("duration", time.Since(start)). + WithField("worker", jj).WithError(err). + Debugf("Downloaded ledger %d", ledgerSeq) + + self.ledgerFeed.Store(ledgerSeq, downloadState{txmeta, err}) + self.signalChan <- err + } + }(j) + } + + return self +} + +// PrepareRange will create a set of parallel worker routines that feed ledgers +// to a channel in the order they're downloaded and store the results in an +// array. You can use this to download ledgers in parallel to fetching them +// individually via `GetLedger()`. `PrepareRange()` is thread-safe. +// +// Note: The passed in range `r` is inclusive of the boundaries. +func (i *parallelIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + // The taskmaster adds ledger sequence numbers to the work queue. + go func() { + start := time.Now() + defer func() { + log.WithField("duration", time.Since(start)). + WithError(ctx.Err()). + Infof("Download of ledger range: [%d, %d] (%d ledgers) complete", + r.Low, r.High, r.Size()) + }() + + for seq := r.Low; seq <= r.High; seq++ { + if ctx.Err() != nil { + log.Warnf("Cancelling remaining downloads ([%d, %d]): %v", + seq, r.High, ctx.Err()) + break + } + + // Adding this to the "set of ledgers being downloaded in parallel" + // means that if a GetLedger() request happens in this range but + // outside of the realm of processing, it can be prioritized by the + // normal, direct download. + i.ledgerQueue.Add(seq) + + i.workQueue <- seq // blocks until there's an available worker + + // We don't remove from the queue here, preferring to remove when + // it's actually pulled from the worker. Removing here would mean + // you could have multiple instances of a ledger download happening. + } + }() + + return nil +} + +func (i *parallelIngester) GetLedger( + ctx context.Context, ledgerSeq uint32, +) (xdr.SerializedLedgerCloseMeta, error) { + // If the requested ledger is out of the queued up ranges, we can fall back + // to the default non-parallel download method. + if !i.ledgerQueue.Contains(ledgerSeq) { + return i.liteIngester.GetLedger(ctx, ledgerSeq) + } + + // If the ledger isn't available yet, wait for the download worker. + var err error + for err == nil { + if iState, ok := i.ledgerFeed.Load(ledgerSeq); ok { + state := iState.(downloadState) + i.ledgerFeed.Delete(ledgerSeq) + i.ledgerQueue.Remove(ledgerSeq) + return state.ledger, state.err + } + + select { + case err = <-i.signalChan: // blocks until another ledger downloads + case <-ctx.Done(): + err = ctx.Err() + } + } + + return xdr.SerializedLedgerCloseMeta{}, err +} + +var _ Ingester = (*parallelIngester)(nil) // ensure conformity to the interface diff --git a/exp/lighthorizon/ingester/participants.go b/exp/lighthorizon/ingester/participants.go new file mode 100644 index 0000000000..ebc49173cf --- /dev/null +++ b/exp/lighthorizon/ingester/participants.go @@ -0,0 +1,35 @@ +package ingester + +import ( + "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/xdr" +) + +// GetTransactionParticipants takes a LedgerTransaction and returns a set of all +// participants (accounts) in the transaction. If there is any error, it will +// return nil and the error. +func GetTransactionParticipants(tx LedgerTransaction) (set.Set[string], error) { + participants, err := index.GetTransactionParticipants(*tx.LedgerTransaction) + if err != nil { + return nil, err + } + set := set.NewSet[string](len(participants)) + set.AddSlice(participants) + return set, nil +} + +// GetOperationParticipants takes a LedgerTransaction, the Operation within the +// transaction, and the 0-based index of the operation within the transaction. +// It will return a set of all participants (accounts) in the operation. If +// there is any error, it will return nil and the error. +func GetOperationParticipants(tx LedgerTransaction, op xdr.Operation, opIndex int) (set.Set[string], error) { + participants, err := index.GetOperationParticipants(*tx.LedgerTransaction, op, opIndex) + if err != nil { + return nil, err + } + + set := set.NewSet[string](len(participants)) + set.AddSlice(participants) + return set, nil +} diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index d6c982e1f4..3427f4a13c 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -9,8 +9,8 @@ import ( "github.com/spf13/cobra" "github.com/stellar/go/exp/lighthorizon/actions" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/exp/lighthorizon/services" "github.com/stellar/go/exp/lighthorizon/tools" @@ -62,6 +62,7 @@ break down accounts by active ledgers.`, cacheDir, _ := cmd.Flags().GetString("ledger-cache") cacheSize, _ := cmd.Flags().GetUint("ledger-cache-size") logLevelParam, _ := cmd.Flags().GetString("log-level") + downloadCount, _ := cmd.Flags().GetUint("parallel-downloads") L := log.WithField("service", "horizon-lite") logLevel, err := logrus.ParseLevel(logLevelParam) @@ -83,11 +84,12 @@ break down accounts by active ledgers.`, return } - ingester, err := archive.NewIngestArchive(archive.ArchiveConfig{ + ingester, err := ingester.NewIngester(ingester.IngesterConfig{ SourceUrl: sourceUrl, NetworkPassphrase: networkPassphrase, CacheDir: cacheDir, CacheSize: int(cacheSize), + ParallelDownloads: downloadCount, }) if err != nil { log.Fatal(err) @@ -95,7 +97,7 @@ break down accounts by active ledgers.`, } Config := services.Config{ - Archive: ingester, + Ingester: ingester, Passphrase: networkPassphrase, IndexStore: indexStore, Metrics: services.NewMetrics(registry), @@ -129,6 +131,8 @@ break down accounts by active ledgers.`, "if left empty, uses a temporary directory") serve.Flags().Uint("ledger-cache-size", defaultCacheSize, "number of ledgers to store in the cache") + serve.Flags().Uint("parallel-downloads", 1, + "how many workers should download ledgers in parallel?") cmd.AddCommand(serve) tools.AddCacheCommands(cmd) diff --git a/exp/lighthorizon/services/cursor.go b/exp/lighthorizon/services/cursor.go index 673a57fce5..8f2d2b0b5c 100644 --- a/exp/lighthorizon/services/cursor.go +++ b/exp/lighthorizon/services/cursor.go @@ -9,7 +9,7 @@ import ( // particular indexing strategy. type CursorManager interface { Begin(cursor int64) (int64, error) - Advance() (int64, error) + Advance(times uint) (int64, error) } type AccountActivityCursorManager struct { @@ -56,35 +56,39 @@ func (c *AccountActivityCursorManager) Begin(cursor int64) (int64, error) { return c.lastCursor.ToInt64(), nil } -func (c *AccountActivityCursorManager) Advance() (int64, error) { +func (c *AccountActivityCursorManager) Advance(times uint) (int64, error) { if c.lastCursor == nil { panic("invalid cursor, call Begin() first") } - // Advancing the cursor means deciding whether or not we need to query the - // index. - - lastLedger := uint32(c.lastCursor.LedgerSequence) + // + // Advancing the cursor means deciding whether or not we need to query + // the index. + // freq := checkpointManager.GetCheckpointFrequency() - if checkpointManager.IsCheckpoint(lastLedger) { - // If the last cursor we looked at was a checkpoint ledger, then we need - // to jump ahead to the next checkpoint. Note that NextActive() is - // "inclusive" so if the parameter is an active checkpoint it will - // return itself. - checkpoint := index.GetCheckpointNumber(uint32(c.lastCursor.LedgerSequence)) - checkpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, checkpoint+1) - if err != nil { - return c.lastCursor.ToInt64(), err + for i := uint(1); i <= times; i++ { + lastLedger := uint32(c.lastCursor.LedgerSequence) + + if checkpointManager.IsCheckpoint(lastLedger) { + // If the last cursor we looked at was a checkpoint ledger, then we + // need to jump ahead to the next checkpoint. Note that NextActive() + // is "inclusive" so if the parameter is an active checkpoint it + // will return itself. + checkpoint := index.GetCheckpointNumber(uint32(c.lastCursor.LedgerSequence)) + checkpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, checkpoint+1) + if err != nil { + return c.lastCursor.ToInt64(), err + } + + // We add a -1 here because an active checkpoint indicates that an + // account had activity in the *previous* 64 ledgers, so we need to + // backtrack to that ledger range. + c.lastCursor = toid.New(int32((checkpoint-1)*freq), 1, 1) + } else { + // Otherwise, we can just bump the ledger number. + c.lastCursor = toid.New(int32(lastLedger+1), 1, 1) } - - // We add a -1 here because an active checkpoint indicates that an - // account had activity in the *previous* 64 ledgers, so we need to - // backtrack to that ledger range. - c.lastCursor = toid.New(int32((checkpoint-1)*freq), 1, 1) - } else { - // Otherwise, we can just bump the ledger number. - c.lastCursor = toid.New(int32(lastLedger+1), 1, 1) } return c.lastCursor.ToInt64(), nil diff --git a/exp/lighthorizon/services/cursor_test.go b/exp/lighthorizon/services/cursor_test.go index 1fa07bd14a..2112ae3715 100644 --- a/exp/lighthorizon/services/cursor_test.go +++ b/exp/lighthorizon/services/cursor_test.go @@ -30,7 +30,7 @@ func TestAccountTransactionCursorManager(t *testing.T) { ) require.NoError(t, err) - for _, checkpoint := range []uint32{1, 5, 10} { + for _, checkpoint := range []uint32{1, 5, 10, 12} { require.NoError(t, store.AddParticipantsToIndexes( checkpoint, allTransactionsIndex, []string{accountId})) } @@ -57,25 +57,40 @@ func TestAccountTransactionCursorManager(t *testing.T) { require.NoError(t, err) assert.EqualValues(t, 4*freq, getLedgerFromCursor(nextCursor)) + // cursor increments for i := int32(1); i < freq; i++ { - nextCursor, err = cursorMgr.Advance() + nextCursor, err = cursorMgr.Advance(1) require.NoError(t, err) assert.EqualValues(t, 4*freq+i, getLedgerFromCursor(nextCursor)) } // cursor jumps to next active checkpoint - nextCursor, err = cursorMgr.Advance() + nextCursor, err = cursorMgr.Advance(1) require.NoError(t, err) assert.EqualValues(t, 9*freq, getLedgerFromCursor(nextCursor)) - // cursor increments - for i := int32(1); i < freq; i++ { - nextCursor, err = cursorMgr.Advance() - require.NoError(t, err) - assert.EqualValues(t, 9*freq+i, getLedgerFromCursor(nextCursor)) - } + // cursor skips + nextCursor, err = cursorMgr.Advance(5) + require.NoError(t, err) + assert.EqualValues(t, 9*freq+5, getLedgerFromCursor(nextCursor)) - // cursor stops when no more actives - _, err = cursorMgr.Advance() + // cursor jumps to next active when skipping + nextCursor, err = cursorMgr.Advance(uint(freq - 5)) + require.NoError(t, err) + assert.EqualValues(t, 11*freq, getLedgerFromCursor(nextCursor)) + + // cursor EOFs at the end + nextCursor, err = cursorMgr.Advance(uint(freq - 1)) + require.NoError(t, err) + assert.EqualValues(t, 12*freq-1, getLedgerFromCursor(nextCursor)) + _, err = cursorMgr.Advance(1) + assert.ErrorIs(t, err, io.EOF) + + // cursor EOFs if skipping past the end + rewind := toid.New(int32(getLedgerFromCursor(nextCursor)-5), 0, 0) + nextCursor, err = cursorMgr.Begin(rewind.ToInt64()) + require.NoError(t, err) + assert.EqualValues(t, rewind.LedgerSequence, getLedgerFromCursor(nextCursor)) + _, err = cursorMgr.Advance(uint(freq)) assert.ErrorIs(t, err, io.EOF) } diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go index 0a2f7ab5f9..d65e5420fd 100644 --- a/exp/lighthorizon/services/main.go +++ b/exp/lighthorizon/services/main.go @@ -6,19 +6,20 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/constraints" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/historyarchive" - "github.com/stellar/go/xdr" - "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" ) const ( - allTransactionsIndex = "all/all" - allPaymentsIndex = "all/payments" + allTransactionsIndex = "all/all" + allPaymentsIndex = "all/payments" + slowFetchDurationThreshold = time.Second ) var ( @@ -64,7 +65,7 @@ type Metrics struct { } type Config struct { - Archive archive.Archive + Ingester ingester.Ingester IndexStore index.Store Passphrase string Metrics Metrics @@ -73,7 +74,7 @@ type Config struct { // searchCallback is a generic way for any endpoint to process a transaction and // its corresponding ledger. It should return whether or not we should stop // processing (e.g. when a limit is reached) and any error that occurred. -type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error) +type searchCallback func(ingester.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error) func searchAccountTransactions(ctx context.Context, cursor int64, @@ -88,48 +89,74 @@ func searchAccountTransactions(ctx context.Context, } else if err != nil { return err } - nextLedger := getLedgerFromCursor(cursor) - log.Debugf("Searching %s for account %s starting at ledger %d", - allTransactionsIndex, accountId, nextLedger) + + log.WithField("cursor", cursor). + Debugf("Searching %s for account %s starting at ledger %d", + allTransactionsIndex, accountId, nextLedger) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() fullStart := time.Now() - avgFetchDuration := time.Duration(0) - avgProcessDuration := time.Duration(0) - avgIndexFetchDuration := time.Duration(0) + fetchDuration := time.Duration(0) + processDuration := time.Duration(0) + indexFetchDuration := time.Duration(0) count := int64(0) defer func() { log.WithField("ledgers", count). - WithField("ledger-fetch", avgFetchDuration.String()). - WithField("ledger-process", avgProcessDuration.String()). - WithField("index-fetch", avgIndexFetchDuration.String()). + WithField("ledger-fetch", fetchDuration). + WithField("ledger-process", processDuration). + WithField("index-fetch", indexFetchDuration). + WithField("avg-ledger-fetch", getAverageDuration(fetchDuration, count)). + WithField("avg-ledger-process", getAverageDuration(processDuration, count)). + WithField("avg-index-fetch", getAverageDuration(indexFetchDuration, count)). WithField("total", time.Since(fullStart)). Infof("Fulfilled request for account %s at cursor %d", accountId, cursor) }() + checkpointMgr := historyarchive.NewCheckpointManager(0) + for { - count++ + if checkpointMgr.IsCheckpoint(nextLedger) { + r := historyarchive.Range{ + Low: nextLedger, + High: checkpointMgr.NextCheckpoint(nextLedger + 1), + } + log.Infof("prepare range %d, %d", r.Low, r.High) + if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil { + log.Errorf("failed to prepare ledger range [%d, %d]: %v", + r.Low, r.High, innerErr) + } + } + start := time.Now() - ledger, ledgerErr := config.Archive.GetLedger(ctx, nextLedger) - if ledgerErr != nil { - return errors.Wrapf(ledgerErr, - "ledger export state is out of sync at ledger %d", nextLedger) + ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to retrieve ledger %d from archive", nextLedger) } - fetchDuration := time.Since(start) - if fetchDuration > time.Second { - log.WithField("duration", fetchDuration). + count++ + thisFetchDuration := time.Since(start) + if thisFetchDuration > slowFetchDurationThreshold { + log.WithField("duration", thisFetchDuration). Warnf("Fetching ledger %d was really slow", nextLedger) } - incrementAverage(&avgFetchDuration, fetchDuration, count) + fetchDuration += thisFetchDuration start = time.Now() - reader, readerErr := config.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(config.Passphrase, ledger) - if readerErr != nil { - return readerErr + reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to read ledger %d", nextLedger) } for { + if ctx.Err() != nil { + return ctx.Err() + } + tx, readErr := reader.Read() if readErr == io.EOF { break @@ -138,50 +165,44 @@ func searchAccountTransactions(ctx context.Context, } // Note: If we move to ledger-based indices, we don't need this, - // since we have a guarantee that the transaction will contain the - // account as a participant. - participants, participantErr := config.Archive.GetTransactionParticipants(tx) + // since we have a guarantee that the transaction will contain + // the account as a participant. + participants, participantErr := ingester.GetTransactionParticipants(tx) if participantErr != nil { return participantErr } if _, found := participants[accountId]; found { - finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header) + finished, callBackErr := callback(tx, &ledger.V0.V0.LedgerHeader.Header) if callBackErr != nil { return callBackErr } else if finished { - incrementAverage(&avgProcessDuration, time.Since(start), count) + processDuration += time.Since(start) return nil } } - - if ctx.Err() != nil { - return ctx.Err() - } } - incrementAverage(&avgProcessDuration, time.Since(start), count) - + processDuration += time.Since(start) start = time.Now() - cursor, err = cursorMgr.Advance() + + cursor, err = cursorMgr.Advance(1) if err != nil && err != io.EOF { return err } nextLedger = getLedgerFromCursor(cursor) - incrementAverage(&avgIndexFetchDuration, time.Since(start), count) + indexFetchDuration += time.Since(start) if err == io.EOF { - return nil + break } } + + return nil } -// This calculates the average by incorporating a new value into an existing -// average in place. Note that `newCount` should represent the *new* total -// number of values incorporated into the average. -// -// Reference: https://math.stackexchange.com/a/106720 -func incrementAverage(prevAverage *time.Duration, latest time.Duration, newCount int64) { - increment := int64(latest-*prevAverage) / newCount - *prevAverage = *prevAverage + time.Duration(increment) +func getAverageDuration[ + T constraints.Signed | constraints.Float, +](d time.Duration, count T) time.Duration { + return time.Duration(int64(float64(d.Nanoseconds()) / float64(count))) } diff --git a/exp/lighthorizon/services/main_test.go b/exp/lighthorizon/services/main_test.go index 33ff614a8e..a8a3958214 100644 --- a/exp/lighthorizon/services/main_test.go +++ b/exp/lighthorizon/services/main_test.go @@ -7,9 +7,9 @@ import ( "github.com/prometheus/client_golang/prometheus" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" - "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/exp/lighthorizon/ingester" + "github.com/stellar/go/ingest" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" "github.com/stretchr/testify/mock" @@ -17,6 +17,7 @@ import ( ) var ( + passphrase = "White New England clam chowder" accountId = "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" startLedgerSeq = 1586112 ) @@ -94,12 +95,12 @@ func TestItGetsOperationsByAccount(t *testing.T) { }) } -func mockArchiveAndIndex(ctx context.Context, passphrase string) (archive.Archive, index.Store) { - mockArchive := &archive.MockArchive{} - mockReaderLedger1 := &archive.MockLedgerTransactionReader{} - mockReaderLedger2 := &archive.MockLedgerTransactionReader{} - mockReaderLedger3 := &archive.MockLedgerTransactionReader{} - mockReaderLedgerTheRest := &archive.MockLedgerTransactionReader{} +func mockArchiveAndIndex(ctx context.Context) (ingester.Ingester, index.Store) { + mockArchive := &ingester.MockIngester{} + mockReaderLedger1 := &ingester.MockLedgerTransactionReader{} + mockReaderLedger2 := &ingester.MockLedgerTransactionReader{} + mockReaderLedger3 := &ingester.MockLedgerTransactionReader{} + mockReaderLedgerTheRest := &ingester.MockLedgerTransactionReader{} expectedLedger1 := testLedger(startLedgerSeq) expectedLedger2 := testLedger(startLedgerSeq + 1) @@ -117,90 +118,74 @@ func mockArchiveAndIndex(ctx context.Context, passphrase string) (archive.Archiv expectedLedger3Tx1 := testLedgerTx(source2, 1, 34) expectedLedger3Tx2 := testLedgerTx(source, 2, 34) - mockArchive. - On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger1, nil). - On("GetLedger", ctx, uint32(1586113)).Return(expectedLedger2, nil). - On("GetLedger", ctx, uint32(1586114)).Return(expectedLedger3, nil). - On("GetLedger", ctx, mock.Anything).Return(xdr.LedgerCloseMeta{}, nil) - - mockArchive. - On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil). - On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil). - On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger3).Return(mockReaderLedger3, nil). - On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, mock.Anything).Return(mockReaderLedgerTheRest, nil) - - partialParticipants := set.Set[string]{} - partialParticipants.Add(source.Address()) - - allParticipants := set.Set[string]{} - allParticipants.Add(source.Address()) - allParticipants.Add(source2.Address()) - - mockArchive. - On("GetTransactionParticipants", expectedLedger1Tx1).Return(partialParticipants, nil). - On("GetTransactionParticipants", expectedLedger1Tx2).Return(partialParticipants, nil). - On("GetTransactionParticipants", expectedLedger2Tx1).Return(partialParticipants, nil). - On("GetTransactionParticipants", expectedLedger2Tx2).Return(allParticipants, nil). - On("GetTransactionParticipants", expectedLedger3Tx1).Return(allParticipants, nil). - On("GetTransactionParticipants", expectedLedger3Tx2).Return(partialParticipants, nil) - - mockArchive. - On("GetOperationParticipants", expectedLedger1Tx1, mock.Anything, int(0)).Return(partialParticipants, nil). - On("GetOperationParticipants", expectedLedger1Tx1, mock.Anything, int(1)).Return(partialParticipants, nil). - On("GetOperationParticipants", expectedLedger1Tx2, mock.Anything, int(0)).Return(partialParticipants, nil). - On("GetOperationParticipants", expectedLedger2Tx1, mock.Anything, int(0)).Return(partialParticipants, nil). - On("GetOperationParticipants", expectedLedger2Tx2, mock.Anything, int(0)).Return(allParticipants, nil). - On("GetOperationParticipants", expectedLedger3Tx1, mock.Anything, int(0)).Return(allParticipants, nil). - On("GetOperationParticipants", expectedLedger3Tx2, mock.Anything, int(0)).Return(partialParticipants, nil) - mockReaderLedger1. On("Read").Return(expectedLedger1Tx1, nil).Once(). On("Read").Return(expectedLedger1Tx2, nil).Once(). - On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + On("Read").Return(ingester.LedgerTransaction{}, io.EOF).Once() mockReaderLedger2. On("Read").Return(expectedLedger2Tx1, nil).Once(). On("Read").Return(expectedLedger2Tx2, nil).Once(). - On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + On("Read").Return(ingester.LedgerTransaction{}, io.EOF).Once() mockReaderLedger3. On("Read").Return(expectedLedger3Tx1, nil).Once(). On("Read").Return(expectedLedger3Tx2, nil).Once(). - On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + On("Read").Return(ingester.LedgerTransaction{}, io.EOF).Once() mockReaderLedgerTheRest. - On("Read").Return(archive.LedgerTransaction{}, io.EOF) + On("Read").Return(ingester.LedgerTransaction{}, io.EOF) + + mockArchive. + On("GetLedger", mock.Anything, uint32(1586112)).Return(expectedLedger1, nil). + On("GetLedger", mock.Anything, uint32(1586113)).Return(expectedLedger2, nil). + On("GetLedger", mock.Anything, uint32(1586114)).Return(expectedLedger3, nil). + On("GetLedger", mock.Anything, mock.AnythingOfType("uint32")). + Return(xdr.SerializedLedgerCloseMeta{}, nil) + + mockArchive. + On("NewLedgerTransactionReader", expectedLedger1).Return(mockReaderLedger1, nil).Once(). + On("NewLedgerTransactionReader", expectedLedger2).Return(mockReaderLedger2, nil).Once(). + On("NewLedgerTransactionReader", expectedLedger3).Return(mockReaderLedger3, nil).Once(). + On("NewLedgerTransactionReader", mock.AnythingOfType("xdr.SerializedLedgerCloseMeta")). + Return(mockReaderLedgerTheRest, nil). + On("PrepareRange", mock.Anything, mock.Anything).Return(nil) // should be 24784 - firstActiveChk := uint32(index.GetCheckpointNumber(uint32(startLedgerSeq))) + activeChk := uint32(index.GetCheckpointNumber(uint32(startLedgerSeq))) mockStore := &index.MockStore{} mockStore. - On("NextActive", accountId, mock.Anything, uint32(0)).Return(firstActiveChk, nil). - On("NextActive", accountId, mock.Anything, firstActiveChk-1).Return(firstActiveChk, nil). - On("NextActive", accountId, mock.Anything, firstActiveChk).Return(firstActiveChk, nil). - On("NextActive", accountId, mock.Anything, firstActiveChk+1).Return(firstActiveChk+1, nil). - On("NextActive", accountId, mock.Anything, firstActiveChk+2).Return(uint32(0), io.EOF) + On("NextActive", accountId, mock.Anything, uint32(0)).Return(activeChk, nil). // start + On("NextActive", accountId, mock.Anything, activeChk-1).Return(activeChk, nil). // prev + On("NextActive", accountId, mock.Anything, activeChk).Return(activeChk, nil). // curr + On("NextActive", accountId, mock.Anything, activeChk+1).Return(uint32(0), io.EOF) // next return mockArchive, mockStore } -func testLedger(seq int) xdr.LedgerCloseMeta { - return xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(seq), +func testLedger(seq int) xdr.SerializedLedgerCloseMeta { + return xdr.SerializedLedgerCloseMeta{ + V: 0, + V0: &xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(seq), + }, }, }, }, } } -func testLedgerTx(source xdr.AccountId, txIndex uint32, bumpTos ...int) archive.LedgerTransaction { - ops := []xdr.Operation{} +func testLedgerTx(source xdr.AccountId, txIndex uint32, bumpTos ...int) ingester.LedgerTransaction { + code := xdr.TransactionResultCodeTxSuccess + + operations := []xdr.Operation{} for _, bumpTo := range bumpTos { - ops = append(ops, xdr.Operation{ + operations = append(operations, xdr.Operation{ Body: xdr.OperationBody{ + Type: xdr.OperationTypeBumpSequence, BumpSequenceOp: &xdr.BumpSequenceOp{ BumpTo: xdr.SequenceNumber(bumpTo), }, @@ -208,29 +193,43 @@ func testLedgerTx(source xdr.AccountId, txIndex uint32, bumpTos ...int) archive. }) } - tx := archive.LedgerTransaction{ - Envelope: xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: source.ToMuxedAccount(), - Fee: xdr.Uint32(1), - Operations: ops, + return ingester.LedgerTransaction{ + LedgerTransaction: &ingest.LedgerTransaction{ + Result: xdr.TransactionResultPair{ + TransactionHash: xdr.Hash{}, + Result: xdr.TransactionResult{ + Result: xdr.TransactionResultResult{ + Code: code, + InnerResultPair: &xdr.InnerTransactionResultPair{}, + Results: &[]xdr.OperationResult{}, + }, }, }, + Envelope: xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: source.ToMuxedAccount(), + Operations: operations, + }, + }, + }, + UnsafeMeta: xdr.TransactionMeta{ + V: 2, + V2: &xdr.TransactionMetaV2{ + Operations: make([]xdr.OperationMeta, len(bumpTos)), + }, + }, + Index: txIndex, }, - Index: txIndex, } - - return tx } func newTransactionService(ctx context.Context) TransactionService { - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) + ingest, store := mockArchiveAndIndex(ctx) return &TransactionRepository{ Config: Config{ - Archive: archive, + Ingester: ingest, IndexStore: store, Passphrase: passphrase, Metrics: NewMetrics(prometheus.NewRegistry()), @@ -239,11 +238,10 @@ func newTransactionService(ctx context.Context) TransactionService { } func newOperationService(ctx context.Context) OperationService { - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) + ingest, store := mockArchiveAndIndex(ctx) return &OperationRepository{ Config: Config{ - Archive: archive, + Ingester: ingest, IndexStore: store, Passphrase: passphrase, Metrics: NewMetrics(prometheus.NewRegistry()), diff --git a/exp/lighthorizon/services/operations.go b/exp/lighthorizon/services/operations.go index a3dd463c20..1236bcdb01 100644 --- a/exp/lighthorizon/services/operations.go +++ b/exp/lighthorizon/services/operations.go @@ -6,8 +6,8 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/common" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -30,9 +30,9 @@ func (or *OperationRepository) GetOperationsByAccount(ctx context.Context, ) ([]common.Operation, error) { ops := []common.Operation{} - opsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { + opsCallback := func(tx ingester.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { for operationOrder, op := range tx.Envelope.Operations() { - opParticipants, err := or.Config.Archive.GetOperationParticipants(tx, op, operationOrder) + opParticipants, err := ingester.GetOperationParticipants(tx, op, operationOrder) if err != nil { return false, err } diff --git a/exp/lighthorizon/services/transactions.go b/exp/lighthorizon/services/transactions.go index 91f46ab4b3..42d3964614 100644 --- a/exp/lighthorizon/services/transactions.go +++ b/exp/lighthorizon/services/transactions.go @@ -6,8 +6,8 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/common" + "github.com/stellar/go/exp/lighthorizon/ingester" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -30,7 +30,7 @@ func (tr *TransactionRepository) GetTransactionsByAccount(ctx context.Context, ) ([]common.Transaction, error) { txs := []common.Transaction{} - txsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { + txsCallback := func(tx ingester.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { txs = append(txs, common.Transaction{ LedgerTransaction: &tx, LedgerHeader: ledgerHeader, diff --git a/exp/lighthorizon/tools/cache.go b/exp/lighthorizon/tools/cache.go index 2913499bac..8f06081f85 100644 --- a/exp/lighthorizon/tools/cache.go +++ b/exp/lighthorizon/tools/cache.go @@ -209,10 +209,10 @@ func PurgeLedgers(cacheDir string, start, end uint32) error { if err := os.Remove(ledgerPath); err != nil { log.Warnf("Failed to remove cached ledger %d: %v", i, err) continue - } else { - log.Debugf("Purged ledger from %s", ledgerPath) - successful++ } + os.Remove(storage.NameLockfile(ledgerPath)) // ignore lockfile errors + log.Debugf("Purged ledger from %s", ledgerPath) + successful++ } log.Infof("Purged %d cached ledgers from %s", successful, cacheDir) diff --git a/historyarchive/range.go b/historyarchive/range.go index fda445ffbc..a81523b2b3 100644 --- a/historyarchive/range.go +++ b/historyarchive/range.go @@ -126,6 +126,10 @@ func (r Range) InRange(sequence uint32) bool { return sequence >= r.Low && sequence <= r.High } +func (r Range) Size() uint32 { + return 1 + (r.High - r.Low) +} + func fmtRangeList(vs []uint32, cManager CheckpointManager) string { slices.Sort(vs) diff --git a/ingest/ledgerbackend/history_archive_backend.go b/ingest/ledgerbackend/history_archive_backend.go index 5749fd1c82..331f43032d 100644 --- a/ingest/ledgerbackend/history_archive_backend.go +++ b/ingest/ledgerbackend/history_archive_backend.go @@ -9,10 +9,10 @@ import ( ) type HistoryArchiveBackend struct { - metaArchive *metaarchive.MetaArchive + metaArchive metaarchive.MetaArchive } -func NewHistoryArchiveBackend(metaArchive *metaarchive.MetaArchive) *HistoryArchiveBackend { +func NewHistoryArchiveBackend(metaArchive metaarchive.MetaArchive) *HistoryArchiveBackend { return &HistoryArchiveBackend{ metaArchive: metaArchive, } diff --git a/metaarchive/main.go b/metaarchive/main.go index 7eb675448d..7d06a46f9a 100644 --- a/metaarchive/main.go +++ b/metaarchive/main.go @@ -12,17 +12,20 @@ import ( "github.com/stellar/go/xdr" ) -type MetaArchive struct { +type MetaArchive interface { + GetLatestLedgerSequence(ctx context.Context) (uint32, error) + GetLedger(ctx context.Context, sequence uint32) (xdr.SerializedLedgerCloseMeta, error) +} + +type metaArchive struct { s storage.Storage } -func NewMetaArchive(b storage.Storage) *MetaArchive { - return &MetaArchive{ - s: b, - } +func NewMetaArchive(b storage.Storage) MetaArchive { + return &metaArchive{s: b} } -func (m *MetaArchive) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { +func (m *metaArchive) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { r, err := m.s.GetFile("latest") if os.IsNotExist(err) { return 2, nil @@ -41,7 +44,7 @@ func (m *MetaArchive) GetLatestLedgerSequence(ctx context.Context) (uint32, erro return uint32(parsed), nil } -func (m *MetaArchive) GetLedger(ctx context.Context, sequence uint32) (xdr.SerializedLedgerCloseMeta, error) { +func (m *metaArchive) GetLedger(ctx context.Context, sequence uint32) (xdr.SerializedLedgerCloseMeta, error) { var ledger xdr.SerializedLedgerCloseMeta r, err := m.s.GetFile("ledgers/" + strconv.FormatUint(uint64(sequence), 10)) if err != nil { diff --git a/support/collections/set/iset.go b/support/collections/set/iset.go new file mode 100644 index 0000000000..f379d322d1 --- /dev/null +++ b/support/collections/set/iset.go @@ -0,0 +1,12 @@ +package set + +type ISet[T comparable] interface { + Add(item T) + AddSlice(items []T) + Remove(item T) + Contains(item T) bool + Slice() []T +} + +var _ ISet[int] = (*Set[int])(nil) // ensure conformity to the interface +var _ ISet[int] = (*safeSet[int])(nil) diff --git a/support/collections/set/safeset.go b/support/collections/set/safeset.go index 5d2b50fc40..a2fa648682 100644 --- a/support/collections/set/safeset.go +++ b/support/collections/set/safeset.go @@ -6,8 +6,8 @@ import ( "golang.org/x/exp/constraints" ) -// safeSet is a simple, thread-safe set implementation. It must be created via -// NewSafeSet. +// safeSet is a simple, thread-safe set implementation. Note that it *must* be +// created via NewSafeSet. type safeSet[T constraints.Ordered] struct { Set[T] lock sync.RWMutex diff --git a/support/collections/set/set.go b/support/collections/set/set.go index 292b71c4f8..7c76a465a6 100644 --- a/support/collections/set/set.go +++ b/support/collections/set/set.go @@ -32,3 +32,5 @@ func (set Set[T]) Slice() []T { } return slice } + +var _ ISet[int] = (*Set[int])(nil) // ensure conformity to the interface diff --git a/support/collections/set/set_test.go b/support/collections/set/set_test.go index 4618de0471..74c6ecc1a2 100644 --- a/support/collections/set/set_test.go +++ b/support/collections/set/set_test.go @@ -7,10 +7,14 @@ import ( ) func TestSet(t *testing.T) { - s := Set[string]{} + s := NewSet[string](10) s.Add("sanity") require.True(t, s.Contains("sanity")) require.False(t, s.Contains("check")) + + s.AddSlice([]string{"a", "b", "c"}) + require.True(t, s.Contains("b")) + require.ElementsMatch(t, []string{"sanity", "a", "b", "c"}, s.Slice()) } func TestSafeSet(t *testing.T) { diff --git a/support/storage/ondisk_cache.go b/support/storage/ondisk_cache.go index 2f6c46a8f0..c8997d7e13 100644 --- a/support/storage/ondisk_cache.go +++ b/support/storage/ondisk_cache.go @@ -9,7 +9,9 @@ import ( "github.com/stellar/go/support/log" ) -// OnDiskCache fronts another storage with a local filesystem cache +// OnDiskCache fronts another storage with a local filesystem cache. Its +// thread-safe, meaning you can be actively caching a file and retrieve it at +// the same time without corruption, because retrieval will wait for the fetch. type OnDiskCache struct { Storage dir string @@ -65,10 +67,15 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { L := b.log.WithField("key", filepath) localPath := path.Join(b.dir, filepath) - // If the lockfile exists, we should defer to the remote source. - _, statErr := os.Stat(nameLockfile(localPath)) - - if _, ok := b.lru.Get(localPath); !ok || statErr == nil { + // If the lockfile exists, we should defer to the remote source but *not* + // update the cache, as it means there's an in-progress sync of the same + // file. + _, statErr := os.Stat(NameLockfile(localPath)) + if statErr == nil { + L.Debug("incomplete file in cache on disk") + L.Debug("retrieving file from remote backend") + return b.Storage.GetFile(filepath) + } else if _, ok := b.lru.Get(localPath); !ok { // If it doesn't exist in the cache, it might still exist on the disk if // we've restarted from an existing directory. local, err := os.Open(localPath) @@ -78,8 +85,7 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { return local, nil } - b.log.WithField("key", filepath). - Debug("retrieving file from remote backend") + L.Debug("retrieving file from remote backend") // Since it's not on-disk, pull it from the remote backend, shove it // into the cache, and write it to disk. @@ -93,12 +99,11 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { // If there's some local FS error, we can still continue with the // remote version, so just log it and continue. L.WithError(err).Error("caching ledger failed") - return remote, nil } return teeReadCloser(remote, local, func() error { - return os.Remove(nameLockfile(localPath)) + return os.Remove(NameLockfile(localPath)) }), nil } @@ -148,8 +153,7 @@ func (b *OnDiskCache) Size(filepath string) (int64, error) { } L.WithError(err).Debug("retrieving size of cached ledger failed") - b.lru.Remove(localPath) // stale cache? - os.Remove(nameLockfile(localPath)) // errors don't matter + b.lru.Remove(localPath) // stale cache? } return b.Storage.Size(filepath) @@ -169,7 +173,7 @@ func (b *OnDiskCache) PutFile(filepath string, in io.ReadCloser) error { } else { // tee upload data into our local file in = teeReadCloser(in, local, func() error { - return os.Remove(nameLockfile(path.Join(b.dir, filepath))) + return os.Remove(NameLockfile(path.Join(b.dir, filepath))) }) } @@ -193,6 +197,7 @@ func (b *OnDiskCache) Evict(filepath string) { func (b *OnDiskCache) onEviction(key, value interface{}) { path := key.(string) + os.Remove(NameLockfile(path)) // just in case if err := os.Remove(path); err != nil { // best effort removal b.log.WithError(err). WithField("key", path). @@ -210,7 +215,7 @@ func (b *OnDiskCache) createLocal(filepath string) (*os.File, error) { if err != nil { return nil, err } - _, err = os.Create(nameLockfile(localPath)) + _, err = os.Create(NameLockfile(localPath)) if err != nil { return nil, err } @@ -219,7 +224,7 @@ func (b *OnDiskCache) createLocal(filepath string) (*os.File, error) { return local, nil } -func nameLockfile(file string) string { +func NameLockfile(file string) string { return file + ".lock" } @@ -239,9 +244,17 @@ func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.R return trc{ Reader: io.TeeReader(r, w), close: func() error { - r.Close() - w.Close() - return onClose() + // Always run all closers, but return the first error + err1 := r.Close() + err2 := w.Close() + err3 := onClose() + + if err1 != nil { + return err1 + } else if err2 != nil { + return err2 + } + return err3 }, } }