Skip to content

Commit

Permalink
stellar#4433: refactor domain model for operations/transactions inter…
Browse files Browse the repository at this point in the history
…face
  • Loading branch information
sreuland committed Jul 11, 2022
1 parent fc233f8 commit 6aff024
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 64 deletions.
4 changes: 2 additions & 2 deletions exp/lighthorizon/actions/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewTXByAccountHandler(lightHorizon services.LightHorizon) func(http.Respons
page.Init()
page.FullURL = r.URL

txns, err := lightHorizon.GetTransactionsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId)
txns, err := lightHorizon.Transactions.GetTransactionsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId)
if err != nil {
log.Error(err)
sendErrorResponse(w, http.StatusInternalServerError, "")
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewOpsByAccountHandler(lightHorizon services.LightHorizon) func(http.Respon
page.Init()
page.FullURL = r.URL

ops, err := lightHorizon.GetOperationsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId)
ops, err := lightHorizon.Operations.GetOperationsByAccount(ctx, paginate.Cursor, paginate.Limit, accountId)
if err != nil {
log.Error(err)
sendErrorResponse(w, http.StatusInternalServerError, "")
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/actions/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Operations(lh services.LightHorizon) func(http.ResponseWriter, *http.Reques
page.FullURL = r.URL

//TODO - implement paginate.Order(asc/desc)
ops, err := lh.GetOperations(r.Context(), paginate.Cursor, paginate.Limit)
ops, err := lh.Operations.GetOperations(r.Context(), paginate.Cursor, paginate.Limit)
if err != nil {
log.Error(err)
sendErrorResponse(w, http.StatusInternalServerError, "")
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Transactions(lh services.LightHorizon) func(http.ResponseWriter, *http.Requ
page.FullURL = r.URL

//TODO - implement paginate.Order(asc/desc)
txns, err := lh.GetTransactions(r.Context(), paginate.Cursor, paginate.Limit)
txns, err := lh.Transactions.GetTransactions(r.Context(), paginate.Cursor, paginate.Limit)
if err != nil {
log.Error(err)
sendErrorResponse(w, http.StatusInternalServerError, "")
Expand Down
13 changes: 10 additions & 3 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ func main() {
defer ingestArchive.Close()

lightHorizon := services.LightHorizon{
Archive: ingestArchive,
Passphrase: *networkPassphrase,
IndexStore: indexStore,
Transactions: services.TransactionsService{
Archive: ingestArchive,
Passphrase: *networkPassphrase,
IndexStore: indexStore,
},
Operations: services.OperationsService{
Archive: ingestArchive,
Passphrase: *networkPassphrase,
IndexStore: indexStore,
},
}

router := chi.NewMux()
Expand Down
61 changes: 36 additions & 25 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,38 @@ import (
)

type LightHorizon struct {
AccountService,
TransactionService,
Operations OperationsService
Transactions TransactionsService
}

type TransactionsService struct {
TransactionRepository,
Archive archive.Archive
IndexStore index.Store
Passphrase string
}

type OperationsService struct {
OperationsRepository,
Archive archive.Archive
IndexStore index.Store
Passphrase string
}

type AccountService interface {
type OperationsRepository interface {
GetOperationsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Operation, error)
GetOperations(ctx context.Context, cursor int64, limit int64) ([]common.Operation, error)
}

type TransactionService interface {
type TransactionRepository interface {
GetTransactionsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Transaction, error)
GetTransactions(ctx context.Context, cursor int64, limit int64) ([]common.Transaction, error)
}

func (lh *LightHorizon) GetOperationsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Operation, error) {
func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Operation, error) {
ops := []common.Operation{}
// Skip the cursor ahead to the next active checkpoint for this account
nextCheckpoint, err := lh.getAccountNextCheckpointCursor(accountId, cursor)
nextCheckpoint, err := getAccountNextCheckpointCursor(accountId, cursor, os.IndexStore)
if err != nil {
if err == io.EOF {
return ops, nil
Expand All @@ -49,12 +60,12 @@ func (lh *LightHorizon) GetOperationsByAccount(ctx context.Context, cursor int64
ledgerSequence := startingCheckPointLedger

for (ledgerSequence - startingCheckPointLedger) < 64 {
ledger, ledgerErr := lh.Archive.GetLedger(ctx, ledgerSequence)
ledger, ledgerErr := os.Archive.GetLedger(ctx, ledgerSequence)
if ledgerErr != nil {
return nil, errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", ledgerSequence, ledgerSequence/64)
}

reader, readerErr := lh.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(lh.Passphrase, ledger)
reader, readerErr := os.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(os.Passphrase, ledger)
if readerErr != nil {
return nil, readerErr
}
Expand All @@ -70,14 +81,14 @@ func (lh *LightHorizon) GetOperationsByAccount(ctx context.Context, cursor int64
}

transactionOrder++
participants, participantErr := lh.Archive.GetTransactionParticipants(tx)
participants, participantErr := os.Archive.GetTransactionParticipants(tx)
if participantErr != nil {
return nil, participantErr
}

if _, found := participants[accountId]; found {
for operationOrder, op := range tx.Envelope.Operations() {
opParticipants, opParticipantErr := lh.Archive.GetOperationParticipants(tx, op, operationOrder+1)
opParticipants, opParticipantErr := os.Archive.GetOperationParticipants(tx, op, operationOrder+1)
if opParticipantErr != nil {
return nil, opParticipantErr
}
Expand All @@ -103,7 +114,7 @@ func (lh *LightHorizon) GetOperationsByAccount(ctx context.Context, cursor int64
ledgerSequence++
}

nextCheckpoint, err = lh.getAccountNextCheckpointCursor(accountId, nextCheckpoint)
nextCheckpoint, err = getAccountNextCheckpointCursor(accountId, nextCheckpoint, os.IndexStore)
if err != nil {
if err == io.EOF {
return ops, nil
Expand All @@ -113,10 +124,10 @@ func (lh *LightHorizon) GetOperationsByAccount(ctx context.Context, cursor int64
}
}

func (lh *LightHorizon) GetTransactionsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Transaction, error) {
func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cursor int64, limit int64, accountId string) ([]common.Transaction, error) {
txs := []common.Transaction{}
// Skip the cursor ahead to the next active checkpoint for this account
nextCheckpoint, err := lh.getAccountNextCheckpointCursor(accountId, cursor)
nextCheckpoint, err := getAccountNextCheckpointCursor(accountId, cursor, ts.IndexStore)
if err != nil {
if err == io.EOF {
return txs, nil
Expand All @@ -130,12 +141,12 @@ func (lh *LightHorizon) GetTransactionsByAccount(ctx context.Context, cursor int
ledgerSequence := startingCheckPointLedger

for (ledgerSequence - startingCheckPointLedger) < 64 {
ledger, ledgerErr := lh.Archive.GetLedger(ctx, ledgerSequence)
ledger, ledgerErr := ts.Archive.GetLedger(ctx, ledgerSequence)
if ledgerErr != nil {
return nil, errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", ledgerSequence, ledgerSequence/64)
}

reader, readerErr := lh.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(lh.Passphrase, ledger)
reader, readerErr := ts.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(ts.Passphrase, ledger)
if readerErr != nil {
return nil, readerErr
}
Expand All @@ -151,7 +162,7 @@ func (lh *LightHorizon) GetTransactionsByAccount(ctx context.Context, cursor int
}

transactionOrder++
participants, participantErr := lh.Archive.GetTransactionParticipants(tx)
participants, participantErr := ts.Archive.GetTransactionParticipants(tx)
if participantErr != nil {
return nil, participantErr
}
Expand All @@ -175,7 +186,7 @@ func (lh *LightHorizon) GetTransactionsByAccount(ctx context.Context, cursor int
ledgerSequence++
}

nextCheckpoint, err = lh.getAccountNextCheckpointCursor(accountId, nextCheckpoint)
nextCheckpoint, err = getAccountNextCheckpointCursor(accountId, nextCheckpoint, ts.IndexStore)
if err != nil {
if err == io.EOF {
return txs, nil
Expand All @@ -185,7 +196,7 @@ func (lh *LightHorizon) GetTransactionsByAccount(ctx context.Context, cursor int
}
}

func (lh *LightHorizon) GetOperations(ctx context.Context, cursor int64, limit int64) ([]common.Operation, error) {
func (os *OperationsService) GetOperations(ctx context.Context, cursor int64, limit int64) ([]common.Operation, error) {
parsedID := toid.Parse(cursor)
ledgerSequence := uint32(parsedID.LedgerSequence)
if ledgerSequence < 2 {
Expand All @@ -200,13 +211,13 @@ func (lh *LightHorizon) GetOperations(ctx context.Context, cursor int64, limit i

for {
log.Debugf("Checking ledger %d", ledgerSequence)
ledger, err := lh.Archive.GetLedger(ctx, ledgerSequence)
ledger, err := os.Archive.GetLedger(ctx, ledgerSequence)
if err != nil {
// no 'NotFound' distinction on err, treat all as not found.
return ops, nil
}

reader, err := lh.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(lh.Passphrase, ledger)
reader, err := os.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(os.Passphrase, ledger)
if err != nil {
return nil, errors.Wrapf(err, "error in ledger %d", ledgerSequence)
}
Expand Down Expand Up @@ -253,7 +264,7 @@ func (lh *LightHorizon) GetOperations(ctx context.Context, cursor int64, limit i
}
}

func (lh *LightHorizon) GetTransactions(ctx context.Context, cursor int64, limit int64) ([]common.Transaction, error) {
func (ts *TransactionsService) GetTransactions(ctx context.Context, cursor int64, limit int64) ([]common.Transaction, error) {
parsedID := toid.Parse(cursor)
ledgerSequence := uint32(parsedID.LedgerSequence)
if ledgerSequence < 2 {
Expand All @@ -268,13 +279,13 @@ func (lh *LightHorizon) GetTransactions(ctx context.Context, cursor int64, limit

for {
log.Debugf("Checking ledger %d", ledgerSequence)
ledger, err := lh.Archive.GetLedger(ctx, ledgerSequence)
ledger, err := ts.Archive.GetLedger(ctx, ledgerSequence)
if err != nil {
// no 'NotFound' distinction on err, treat all as not found.
return txns, nil
}

reader, err := lh.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(lh.Passphrase, ledger)
reader, err := ts.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(ts.Passphrase, ledger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,9 +333,9 @@ func (lh *LightHorizon) GetTransactions(ctx context.Context, cursor int64, limit
}
}

func (lh *LightHorizon) getAccountNextCheckpointCursor(accountId string, cursor int64) (int64, error) {
func getAccountNextCheckpointCursor(accountId string, cursor int64, store index.Store) (int64, error) {
var checkpoint uint32
checkpoint, err := lh.IndexStore.NextActive(accountId, "all/all", uint32(toid.Parse(cursor).LedgerSequence/64))
checkpoint, err := store.NextActive(accountId, "all/all", uint32(toid.Parse(cursor).LedgerSequence/64))
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 6aff024

Please sign in to comment.