Skip to content

Commit

Permalink
services/horizon/internal/ingest/processors: Refactor participants pr…
Browse files Browse the repository at this point in the history
…ocessors to support new ingestion data flow (#5024)
  • Loading branch information
tamirms authored Aug 23, 2023
1 parent a4db2a9 commit 461e5a1
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type MockOperationParticipantBatchInsertBuilder struct {
}

// Add mock
func (m *MockOperationParticipantBatchInsertBuilder) Add(operationID int64, accountID int64) error {
a := m.Called(operationID, accountID)
func (m *MockOperationParticipantBatchInsertBuilder) Add(operationID int64, account FutureAccountID) error {
a := m.Called(operationID, account)
return a.Error(0)
}

Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/db2/history/mock_q_participants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package history
import (
"context"

"github.com/stellar/go/support/db"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/support/db"
)

// MockQParticipants is a mock implementation of the QParticipants interface
Expand All @@ -28,7 +29,7 @@ type MockTransactionParticipantsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTransactionParticipantsBatchInsertBuilder) Add(transactionID, accountID int64) error {
func (m *MockTransactionParticipantsBatchInsertBuilder) Add(transactionID int64, accountID FutureAccountID) error {
a := m.Called(transactionID, accountID)
return a.Error(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type OperationParticipantBatchInsertBuilder interface {
Add(
operationID int64,
accountID int64,
accountID FutureAccountID,
) error
Exec(ctx context.Context, session db.SessionInterface) error
}
Expand All @@ -33,7 +33,7 @@ func (q *Q) NewOperationParticipantBatchInsertBuilder() OperationParticipantBatc
// Add adds an operation participant to the batch
func (i *operationParticipantBatchInsertBuilder) Add(
operationID int64,
accountID int64,
accountID FutureAccountID,
) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

Expand All @@ -13,13 +15,15 @@ func TestAddOperationParticipants(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

accountLoader := NewAccountLoader()
address := keypair.MustRandom().Address()
tt.Assert.NoError(q.Begin())
builder := q.NewOperationParticipantBatchInsertBuilder()
err := builder.Add(240518172673, 1)
err := builder.Add(240518172673, accountLoader.GetFuture(address))
tt.Assert.NoError(err)

err = builder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

type hop struct {
Expand All @@ -39,6 +43,6 @@ func TestAddOperationParticipants(t *testing.T) {

op := ops[0]
tt.Assert.Equal(int64(240518172673), op.OperationID)
tt.Assert.Equal(int64(1), op.AccountID)
tt.Assert.Equal(accountLoader.getNow(address), op.AccountID)
}
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/participants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type QParticipants interface {
// TransactionParticipantsBatchInsertBuilder is used to insert transaction participants into the
// history_transaction_participants table
type TransactionParticipantsBatchInsertBuilder interface {
Add(transactionID, accountID int64) error
Add(transactionID int64, accountID FutureAccountID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -34,7 +34,7 @@ func (q *Q) NewTransactionParticipantsBatchInsertBuilder() TransactionParticipan
}

// Add adds a new transaction participant to the batch
func (i *transactionParticipantsBatchInsertBuilder) Add(transactionID, accountID int64) error {
func (i *transactionParticipantsBatchInsertBuilder) Add(transactionID int64, accountID FutureAccountID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_account_id": accountID,
Expand Down
37 changes: 23 additions & 14 deletions services/horizon/internal/db2/history/participants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

Expand Down Expand Up @@ -32,29 +34,36 @@ func TestTransactionParticipantsBatch(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

tt.Assert.NoError(q.Begin())
batch := q.NewTransactionParticipantsBatchInsertBuilder()
accountLoader := NewAccountLoader()

transactionID := int64(1)
otherTransactionID := int64(2)
accountID := int64(100)

var addresses []string
for i := int64(0); i < 3; i++ {
tt.Assert.NoError(batch.Add(transactionID, accountID+i))
address := keypair.MustRandom().Address()
addresses = append(addresses, address)
tt.Assert.NoError(batch.Add(transactionID, accountLoader.GetFuture(address)))
}

tt.Assert.NoError(batch.Add(otherTransactionID, accountID))
address := keypair.MustRandom().Address()
addresses = append(addresses, address)
tt.Assert.NoError(batch.Add(otherTransactionID, accountLoader.GetFuture(address)))

tt.Assert.NoError(q.Begin())
tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(batch.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

participants := getTransactionParticipants(tt, q)
tt.Assert.Equal(
[]transactionParticipant{
{TransactionID: 1, AccountID: 100},
{TransactionID: 1, AccountID: 101},
{TransactionID: 1, AccountID: 102},
{TransactionID: 2, AccountID: 100},
},
participants,
)
expected := []transactionParticipant{
{TransactionID: 1},
{TransactionID: 1},
{TransactionID: 1},
{TransactionID: 2},
}
for i := range expected {
expected[i].AccountID = accountLoader.getNow(addresses[i])
}
tt.Assert.ElementsMatch(expected, participants)
}
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (m *mockDBQ) NewTransactionParticipantsBatchInsertBuilder() history.Transac

func (m *mockDBQ) NewOperationParticipantBatchInsertBuilder() history.OperationParticipantBatchInsertBuilder {
args := m.Called()
return args.Get(0).(history.TransactionParticipantsBatchInsertBuilder)
return args.Get(0).(history.OperationParticipantBatchInsertBuilder)
}

func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
Expand Down
146 changes: 28 additions & 118 deletions services/horizon/internal/ingest/processors/participants_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
set "github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/toid"
Expand All @@ -17,66 +16,23 @@ import (
// ParticipantsProcessor is a processor which ingests various participants
// from different sources (transactions, operations, etc)
type ParticipantsProcessor struct {
session db.SessionInterface
participantsQ history.QParticipants
sequence uint32
participantSet map[string]participant
accountLoader *history.AccountLoader
txBatch history.TransactionParticipantsBatchInsertBuilder
opBatch history.OperationParticipantBatchInsertBuilder
}

func NewParticipantsProcessor(session db.SessionInterface, participantsQ history.QParticipants, sequence uint32) *ParticipantsProcessor {
func NewParticipantsProcessor(
accountLoader *history.AccountLoader,
txBatch history.TransactionParticipantsBatchInsertBuilder,
opBatch history.OperationParticipantBatchInsertBuilder,
) *ParticipantsProcessor {
return &ParticipantsProcessor{
session: session,
participantsQ: participantsQ,
sequence: sequence,
participantSet: map[string]participant{},
accountLoader: accountLoader,
txBatch: txBatch,
opBatch: opBatch,
}
}

type participant struct {
accountID int64
transactionSet set.Set[int64]
operationSet set.Set[int64]
}

func (p *participant) addTransactionID(id int64) {
if p.transactionSet == nil {
p.transactionSet = set.Set[int64]{}
}
p.transactionSet.Add(id)
}

func (p *participant) addOperationID(id int64) {
if p.operationSet == nil {
p.operationSet = set.Set[int64]{}
}
p.operationSet.Add(id)
}

func (p *ParticipantsProcessor) loadAccountIDs(ctx context.Context, participantSet map[string]participant) error {
addresses := make([]string, 0, len(participantSet))
for address := range participantSet {
addresses = append(addresses, address)
}

addressToID, err := p.participantsQ.CreateAccounts(ctx, addresses, maxBatchSize)
if err != nil {
return errors.Wrap(err, "Could not create account ids")
}

for _, address := range addresses {
id, ok := addressToID[address]
if !ok {
return errors.Errorf("no id found for account address %s", address)
}

participantForAddress := participantSet[address]
participantForAddress.accountID = id
participantSet[address] = participantForAddress
}

return nil
}

func participantsForChanges(
changes xdr.LedgerEntryChanges,
) ([]xdr.AccountId, error) {
Expand Down Expand Up @@ -144,7 +100,6 @@ func participantsForMeta(
}

func (p *ParticipantsProcessor) addTransactionParticipants(
participantSet map[string]participant,
sequence uint32,
transaction ingest.LedgerTransaction,
) error {
Expand All @@ -158,17 +113,15 @@ func (p *ParticipantsProcessor) addTransactionParticipants(
}

for _, participant := range transactionParticipants {
address := participant.Address()
entry := participantSet[address]
entry.addTransactionID(transactionID)
participantSet[address] = entry
if err := p.txBatch.Add(transactionID, p.accountLoader.GetFuture(participant.Address())); err != nil {
return err
}
}

return nil
}

func (p *ParticipantsProcessor) addOperationsParticipants(
participantSet map[string]participant,
sequence uint32,
transaction ingest.LedgerTransaction,
) error {
Expand All @@ -177,82 +130,39 @@ func (p *ParticipantsProcessor) addOperationsParticipants(
return errors.Wrap(err, "could not determine operation participants")
}

for operationID, p := range participants {
for _, participant := range p {
for operationID, addresses := range participants {
for _, participant := range addresses {
address := participant.Address()
entry := participantSet[address]
entry.addOperationID(operationID)
participantSet[address] = entry
}
}

return nil
}

func (p *ParticipantsProcessor) insertDBTransactionParticipants(ctx context.Context, participantSet map[string]participant) error {
batch := p.participantsQ.NewTransactionParticipantsBatchInsertBuilder()

for _, entry := range participantSet {
for transactionID := range entry.transactionSet {
if err := batch.Add(transactionID, entry.accountID); err != nil {
return errors.Wrap(err, "Could not insert transaction participant in db")
if err := p.opBatch.Add(operationID, p.accountLoader.GetFuture(address)); err != nil {
return err
}
}
}

if err := batch.Exec(ctx, p.session); err != nil {
return errors.Wrap(err, "Could not flush transaction participants to db")
}
return nil
}

func (p *ParticipantsProcessor) insertDBOperationsParticipants(ctx context.Context, participantSet map[string]participant) error {
batch := p.participantsQ.NewOperationParticipantBatchInsertBuilder()
func (p *ParticipantsProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {

for _, entry := range participantSet {
for operationID := range entry.operationSet {
if err := batch.Add(operationID, entry.accountID); err != nil {
return errors.Wrap(err, "could not insert operation participant in db")
}
}
}

if err := batch.Exec(ctx, p.session); err != nil {
return errors.Wrap(err, "could not flush operation participants to db")
}
return nil
}

func (p *ParticipantsProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error) {
err = p.addTransactionParticipants(p.participantSet, p.sequence, transaction)
if err != nil {
if err := p.addTransactionParticipants(lcm.LedgerSequence(), transaction); err != nil {
return err
}

err = p.addOperationsParticipants(p.participantSet, p.sequence, transaction)
if err != nil {
if err := p.addOperationsParticipants(lcm.LedgerSequence(), transaction); err != nil {
return err
}

return nil
}

func (p *ParticipantsProcessor) Commit(ctx context.Context) (err error) {
if len(p.participantSet) > 0 {
if err = p.loadAccountIDs(ctx, p.participantSet); err != nil {
return err
}

if err = p.insertDBTransactionParticipants(ctx, p.participantSet); err != nil {
return err
}

if err = p.insertDBOperationsParticipants(ctx, p.participantSet); err != nil {
return err
}
func (p *ParticipantsProcessor) Commit(ctx context.Context, session db.SessionInterface) error {
if err := p.txBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush transaction participants to db")
}

return err
if err := p.opBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush operation participants to db")
}
return nil
}

func ParticipantsForTransaction(
Expand Down
Loading

0 comments on commit 461e5a1

Please sign in to comment.