Skip to content

Commit

Permalink
exp/lighthorizon: Isolate cursor advancement code to its own interface (
Browse files Browse the repository at this point in the history
#4484)

* Move cursor manipulation code to a separate interface
* Small test refactor to improve readability and long-running lines
* Combine tx and op tests into subtests
* Fix how IndexStore is mocked out
  • Loading branch information
Shaptic authored Jul 28, 2022
1 parent 738befe commit 6a9373e
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 191 deletions.
4 changes: 3 additions & 1 deletion exp/lighthorizon/index/types/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func (i *CheckpointIndex) Merge(other *CheckpointIndex) error {
return err
}

// NextActive returns the next checkpoint (inclusive) where this index is active.
// NextActive returns the next checkpoint (inclusive) where this index is
// active. "Inclusive" means that if the index is active at `checkpoint`, this
// returns `checkpoint`.
func (i *CheckpointIndex) NextActive(checkpoint uint32) (uint32, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
Expand Down
98 changes: 98 additions & 0 deletions exp/lighthorizon/services/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package services

import (
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/toid"
)

// CursorManager describes a way to control how a cursor advances for a
// particular indexing strategy.
type CursorManager interface {
Begin(cursor int64) (int64, error)
Advance() (int64, error)
}

type AccountActivityCursorManager struct {
AccountId string

store index.Store
lastCursor *toid.ID
}

func NewCursorManagerForAccountActivity(store index.Store, accountId string) *AccountActivityCursorManager {
return &AccountActivityCursorManager{AccountId: accountId, store: store}
}

func (c *AccountActivityCursorManager) Begin(cursor int64) (int64, error) {
freq := checkpointManager.GetCheckpointFrequency()
id := toid.Parse(cursor)
lastCheckpoint := uint32(0)
if id.LedgerSequence >= int32(checkpointManager.GetCheckpointFrequency()) {
lastCheckpoint = index.GetCheckpointNumber(uint32(id.LedgerSequence))
}

// We shouldn't take the provided cursor for granted: instead, we should
// skip ahead to the first active ledger that's >= the given cursor.
//
// For example, someone might say ?cursor=0 but the first active checkpoint
// is actually 40M ledgers in.
firstCheckpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, lastCheckpoint)
if err != nil {
return cursor, err
}

nextLedger := (firstCheckpoint - 1) * freq

// However, if the given cursor is actually *more* specific than the index
// can give us (e.g. somewhere *within* an active checkpoint range), prefer
// it rather than starting over.
if nextLedger < uint32(id.LedgerSequence) {
better := toid.Parse(cursor)
c.lastCursor = &better
return cursor, nil
}

c.lastCursor = toid.New(int32(nextLedger), 1, 1)
return c.lastCursor.ToInt64(), nil
}

func (c *AccountActivityCursorManager) Advance() (int64, error) {
if c.lastCursor == nil {
panic("invalid cursor, call Begin() first")
}

// Advancing the cursor means deciding whether or not we need to query the
// index.

lastLedger := uint32(c.lastCursor.LedgerSequence)
freq := checkpointManager.GetCheckpointFrequency()

if checkpointManager.IsCheckpoint(lastLedger) {
// If the last cursor we looked at was a checkpoint ledger, then we need
// to jump ahead to the next checkpoint. Note that NextActive() is
// "inclusive" so if the parameter is an active checkpoint it will
// return itself.
checkpoint := index.GetCheckpointNumber(uint32(c.lastCursor.LedgerSequence))
checkpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, checkpoint+1)
if err != nil {
return c.lastCursor.ToInt64(), err
}

// We add a -1 here because an active checkpoint indicates that an
// account had activity in the *previous* 64 ledgers, so we need to
// backtrack to that ledger range.
c.lastCursor = toid.New(int32((checkpoint-1)*freq), 1, 1)
} else {
// Otherwise, we can just bump the ledger number.
c.lastCursor = toid.New(int32(lastLedger+1), 1, 1)
}

return c.lastCursor.ToInt64(), nil
}

var _ CursorManager = (*AccountActivityCursorManager)(nil) // ensure conformity to the interface

// getLedgerFromCursor is a helpful way to turn a cursor into a ledger number
func getLedgerFromCursor(cursor int64) uint32 {
return uint32(toid.Parse(cursor).LedgerSequence)
}
78 changes: 78 additions & 0 deletions exp/lighthorizon/services/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package services

import (
"io"
"testing"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/keypair"
"github.com/stellar/go/toid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
checkpointMgr = historyarchive.NewCheckpointManager(0)
)

func TestAccountTransactionCursorManager(t *testing.T) {
freq := int32(checkpointMgr.GetCheckpointFrequency())
accountId := keypair.MustRandom().Address()

// Create an index and fill it with some checkpoint details.
store, err := index.NewFileStore(index.StoreConfig{
Url: "file://" + t.TempDir(),
Workers: 4,
})
require.NoError(t, err)

for _, checkpoint := range []uint32{1, 5, 10} {
require.NoError(t, store.AddParticipantsToIndexes(
checkpoint, allTransactionsIndex, []string{accountId}))
}

cursorMgr := NewCursorManagerForAccountActivity(store, accountId)

cursor := toid.New(1, 1, 1)
var nextCursor int64

// first checkpoint works
nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, 1, getLedgerFromCursor(nextCursor))

// cursor is preserved if mid-active-range
cursor.LedgerSequence = freq / 2
nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, cursor.LedgerSequence, getLedgerFromCursor(nextCursor))

// cursor jumps ahead if not active
cursor.LedgerSequence = 2 * freq
nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, 4*freq, getLedgerFromCursor(nextCursor))

for i := int32(1); i < freq; i++ {
nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 4*freq+i, getLedgerFromCursor(nextCursor))
}

// cursor jumps to next active checkpoint
nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 9*freq, getLedgerFromCursor(nextCursor))

// cursor increments
for i := int32(1); i < freq; i++ {
nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 9*freq+i, getLedgerFromCursor(nextCursor))
}

// cursor stops when no more actives
_, err = cursorMgr.Advance()
assert.ErrorIs(t, err, io.EOF)
}
83 changes: 36 additions & 47 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/stellar/go/exp/lighthorizon/common"
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)

const (
allIndexes = "all/all"
allTransactionsIndex = "all/all"
allPaymentsIndex = "all/payments"
)

var (
Expand Down Expand Up @@ -52,16 +52,24 @@ type TransactionRepository interface {
GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error)
}

// searchCallback is a generic way for any endpoint to process a transaction and
// its corresponding ledger. It should return whether or not we should stop
// processing (e.g. when a limit is reached) and any error that occurred.
type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error)

func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Operation, error) {
func (os *OperationsService) GetOperationsByAccount(ctx context.Context,
cursor int64, limit uint64,
accountId string,
) ([]common.Operation, error) {
ops := []common.Operation{}

opsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) {
for operationOrder, op := range tx.Envelope.Operations() {
opParticipants, opParticipantErr := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder)
if opParticipantErr != nil {
return false, opParticipantErr
opParticipants, err := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder)
if err != nil {
return false, err
}

if _, foundInOp := opParticipants[accountId]; foundInOp {
ops = append(ops, common.Operation{
TransactionEnvelope: &tx.Envelope,
Expand All @@ -70,11 +78,13 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor
TxIndex: int32(tx.Index),
OpIndex: int32(operationOrder),
})

if uint64(len(ops)) == limit {
return true, nil
}
}
}

return false, nil
}

Expand All @@ -85,7 +95,10 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor
return ops, nil
}

func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) {
func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context,
cursor int64, limit uint64,
accountId string,
) ([]common.Transaction, error) {
txs := []common.Transaction{}

txsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) {
Expand All @@ -96,7 +109,8 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur
TxIndex: int32(tx.Index),
NetworkPassphrase: ts.Config.Passphrase,
})
return (uint64(len(txs)) >= limit), nil

return uint64(len(txs)) == limit, nil
}

if err := searchTxByAccount(ctx, cursor, accountId, ts.Config, txsCallback); err != nil {
Expand All @@ -106,18 +120,23 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur
}

func searchTxByAccount(ctx context.Context, cursor int64, accountId string, config Config, callback searchCallback) error {
nextLedger, err := getAccountNextLedgerCursor(accountId, cursor, config.IndexStore, allIndexes)
cursorMgr := NewCursorManagerForAccountActivity(config.IndexStore, accountId)
cursor, err := cursorMgr.Begin(cursor)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
log.Debugf("Searching index by account %v starting at cursor %v", accountId, nextLedger)

nextLedger := getLedgerFromCursor(cursor)
log.Debugf("Searching %s for account %s starting at ledger %d",
allTransactionsIndex, accountId, nextLedger)

for {
ledger, ledgerErr := config.Archive.GetLedger(ctx, uint32(nextLedger))
ledger, ledgerErr := config.Archive.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", nextLedger, getIndexCheckpointCounter(uint32(nextLedger)))
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
}

reader, readerErr := config.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(config.Passphrase, ledger)
Expand All @@ -140,54 +159,24 @@ func searchTxByAccount(ctx context.Context, cursor int64, accountId string, conf
}

if _, found := participants[accountId]; found {
if finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header); callBackErr != nil {
finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header)
if finished || callBackErr != nil {
return callBackErr
} else if finished {
return nil
}
}

if ctx.Err() != nil {
return ctx.Err()
}
}
nextCursor := toid.New(int32(nextLedger), 1, 1).ToInt64()
nextLedger, err = getAccountNextLedgerCursor(accountId, nextCursor, config.IndexStore, allIndexes)

cursor, err = cursorMgr.Advance()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}

// this deals in ledgers but adapts to the index model, which is currently keyed by checkpoint for now
func getAccountNextLedgerCursor(accountId string, cursor int64, store index.Store, indexName string) (uint64, error) {
nextLedger := uint32(toid.Parse(cursor).LedgerSequence + 1)

// done for performance reasons, skip reading the index for any requested ledger cursors
// only need to read the index when next cursor falls on checkpoint boundary
if !checkpointManager.IsCheckpoint(nextLedger) {
return uint64(nextLedger), nil
}

// the 'NextActive' index query takes a starting checkpoint, from which the index is scanned AFTER that checkpoint, non-inclusive
// use the the currrent checkpoint as the starting point since it represents up to the cursor's ledger
queryStartingCheckpoint := getIndexCheckpointCounter(nextLedger)
indexNextCheckpoint, err := store.NextActive(accountId, indexName, queryStartingCheckpoint)

if err != nil {
return 0, err
nextLedger = getLedgerFromCursor(cursor)
}

// return the first ledger of the next index checkpoint that had account activity after cursor
// so we need to go back 64 ledgers(one checkpoint's worth) relative to next index checkpoint
// to get first ledger, since checkpoint ledgers are the last/greatest ledger in the checkpoint range
return uint64((indexNextCheckpoint - 1) * checkpointManager.GetCheckpointFrequency()), nil
}

// derives what checkpoint this ledger would be in the index
func getIndexCheckpointCounter(ledger uint32) uint32 {
return (checkpointManager.GetCheckpoint(uint32(ledger)) /
checkpointManager.GetCheckpointFrequency()) + 1
}
Loading

0 comments on commit 6a9373e

Please sign in to comment.