Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest/processors: Refactor participants processors to support new ingestion data flow #5024

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type MockOperationParticipantBatchInsertBuilder struct {
}

// Add mock
func (m *MockOperationParticipantBatchInsertBuilder) Add(operationID int64, accountID int64) error {
a := m.Called(operationID, accountID)
func (m *MockOperationParticipantBatchInsertBuilder) Add(operationID int64, account FutureAccountID) error {
a := m.Called(operationID, account)
return a.Error(0)
}

Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/db2/history/mock_q_participants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package history
import (
"context"

"github.com/stellar/go/support/db"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/support/db"
)

// MockQParticipants is a mock implementation of the QParticipants interface
Expand All @@ -28,7 +29,7 @@ type MockTransactionParticipantsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTransactionParticipantsBatchInsertBuilder) Add(transactionID, accountID int64) error {
func (m *MockTransactionParticipantsBatchInsertBuilder) Add(transactionID int64, accountID FutureAccountID) error {
a := m.Called(transactionID, accountID)
return a.Error(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type OperationParticipantBatchInsertBuilder interface {
Add(
operationID int64,
accountID int64,
accountID FutureAccountID,
) error
Exec(ctx context.Context, session db.SessionInterface) error
}
Expand All @@ -33,7 +33,7 @@ func (q *Q) NewOperationParticipantBatchInsertBuilder() OperationParticipantBatc
// Add adds an operation participant to the batch
func (i *operationParticipantBatchInsertBuilder) Add(
operationID int64,
accountID int64,
accountID FutureAccountID,
) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

Expand All @@ -13,13 +15,15 @@ func TestAddOperationParticipants(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

accountLoader := NewAccountLoader()
address := keypair.MustRandom().Address()
tt.Assert.NoError(q.Begin())
builder := q.NewOperationParticipantBatchInsertBuilder()
err := builder.Add(240518172673, 1)
err := builder.Add(240518172673, accountLoader.GetFuture(address))
tt.Assert.NoError(err)

err = builder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

type hop struct {
Expand All @@ -39,6 +43,6 @@ func TestAddOperationParticipants(t *testing.T) {

op := ops[0]
tt.Assert.Equal(int64(240518172673), op.OperationID)
tt.Assert.Equal(int64(1), op.AccountID)
tt.Assert.Equal(accountLoader.getNow(address), op.AccountID)
}
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/participants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type QParticipants interface {
// TransactionParticipantsBatchInsertBuilder is used to insert transaction participants into the
// history_transaction_participants table
type TransactionParticipantsBatchInsertBuilder interface {
Add(transactionID, accountID int64) error
Add(transactionID int64, accountID FutureAccountID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -34,7 +34,7 @@ func (q *Q) NewTransactionParticipantsBatchInsertBuilder() TransactionParticipan
}

// Add adds a new transaction participant to the batch
func (i *transactionParticipantsBatchInsertBuilder) Add(transactionID, accountID int64) error {
func (i *transactionParticipantsBatchInsertBuilder) Add(transactionID int64, accountID FutureAccountID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_account_id": accountID,
Expand Down
37 changes: 23 additions & 14 deletions services/horizon/internal/db2/history/participants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

Expand Down Expand Up @@ -32,29 +34,36 @@ func TestTransactionParticipantsBatch(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

tt.Assert.NoError(q.Begin())
batch := q.NewTransactionParticipantsBatchInsertBuilder()
accountLoader := NewAccountLoader()

transactionID := int64(1)
otherTransactionID := int64(2)
accountID := int64(100)

var addresses []string
for i := int64(0); i < 3; i++ {
tt.Assert.NoError(batch.Add(transactionID, accountID+i))
address := keypair.MustRandom().Address()
addresses = append(addresses, address)
tt.Assert.NoError(batch.Add(transactionID, accountLoader.GetFuture(address)))
}

tt.Assert.NoError(batch.Add(otherTransactionID, accountID))
address := keypair.MustRandom().Address()
addresses = append(addresses, address)
tt.Assert.NoError(batch.Add(otherTransactionID, accountLoader.GetFuture(address)))

tt.Assert.NoError(q.Begin())
tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(batch.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

participants := getTransactionParticipants(tt, q)
tt.Assert.Equal(
[]transactionParticipant{
{TransactionID: 1, AccountID: 100},
{TransactionID: 1, AccountID: 101},
{TransactionID: 1, AccountID: 102},
{TransactionID: 2, AccountID: 100},
},
participants,
)
expected := []transactionParticipant{
{TransactionID: 1},
{TransactionID: 1},
{TransactionID: 1},
{TransactionID: 2},
}
for i := range expected {
expected[i].AccountID = accountLoader.getNow(addresses[i])
}
tt.Assert.ElementsMatch(expected, participants)
}
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (m *mockDBQ) NewTransactionParticipantsBatchInsertBuilder() history.Transac

func (m *mockDBQ) NewOperationParticipantBatchInsertBuilder() history.OperationParticipantBatchInsertBuilder {
args := m.Called()
return args.Get(0).(history.TransactionParticipantsBatchInsertBuilder)
return args.Get(0).(history.OperationParticipantBatchInsertBuilder)
}

func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
Expand Down
146 changes: 28 additions & 118 deletions services/horizon/internal/ingest/processors/participants_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
set "github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/toid"
Expand All @@ -17,66 +16,23 @@ import (
// ParticipantsProcessor is a processor which ingests various participants
// from different sources (transactions, operations, etc)
type ParticipantsProcessor struct {
session db.SessionInterface
participantsQ history.QParticipants
sequence uint32
participantSet map[string]participant
accountLoader *history.AccountLoader
txBatch history.TransactionParticipantsBatchInsertBuilder
opBatch history.OperationParticipantBatchInsertBuilder
}

func NewParticipantsProcessor(session db.SessionInterface, participantsQ history.QParticipants, sequence uint32) *ParticipantsProcessor {
func NewParticipantsProcessor(
accountLoader *history.AccountLoader,
txBatch history.TransactionParticipantsBatchInsertBuilder,
opBatch history.OperationParticipantBatchInsertBuilder,
) *ParticipantsProcessor {
return &ParticipantsProcessor{
session: session,
participantsQ: participantsQ,
sequence: sequence,
participantSet: map[string]participant{},
accountLoader: accountLoader,
txBatch: txBatch,
opBatch: opBatch,
}
}

type participant struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, the new Builder and Loaders decluttered the processor quite a bit!

accountID int64
transactionSet set.Set[int64]
operationSet set.Set[int64]
}

func (p *participant) addTransactionID(id int64) {
if p.transactionSet == nil {
p.transactionSet = set.Set[int64]{}
}
p.transactionSet.Add(id)
}

func (p *participant) addOperationID(id int64) {
if p.operationSet == nil {
p.operationSet = set.Set[int64]{}
}
p.operationSet.Add(id)
}

func (p *ParticipantsProcessor) loadAccountIDs(ctx context.Context, participantSet map[string]participant) error {
addresses := make([]string, 0, len(participantSet))
for address := range participantSet {
addresses = append(addresses, address)
}

addressToID, err := p.participantsQ.CreateAccounts(ctx, addresses, maxBatchSize)
if err != nil {
return errors.Wrap(err, "Could not create account ids")
}

for _, address := range addresses {
id, ok := addressToID[address]
if !ok {
return errors.Errorf("no id found for account address %s", address)
}

participantForAddress := participantSet[address]
participantForAddress.accountID = id
participantSet[address] = participantForAddress
}

return nil
}

func participantsForChanges(
changes xdr.LedgerEntryChanges,
) ([]xdr.AccountId, error) {
Expand Down Expand Up @@ -144,7 +100,6 @@ func participantsForMeta(
}

func (p *ParticipantsProcessor) addTransactionParticipants(
participantSet map[string]participant,
sequence uint32,
transaction ingest.LedgerTransaction,
) error {
Expand All @@ -158,17 +113,15 @@ func (p *ParticipantsProcessor) addTransactionParticipants(
}

for _, participant := range transactionParticipants {
address := participant.Address()
entry := participantSet[address]
entry.addTransactionID(transactionID)
participantSet[address] = entry
if err := p.txBatch.Add(transactionID, p.accountLoader.GetFuture(participant.Address())); err != nil {
return err
}
}

return nil
}

func (p *ParticipantsProcessor) addOperationsParticipants(
participantSet map[string]participant,
sequence uint32,
transaction ingest.LedgerTransaction,
) error {
Expand All @@ -177,82 +130,39 @@ func (p *ParticipantsProcessor) addOperationsParticipants(
return errors.Wrap(err, "could not determine operation participants")
}

for operationID, p := range participants {
for _, participant := range p {
for operationID, addresses := range participants {
for _, participant := range addresses {
address := participant.Address()
entry := participantSet[address]
entry.addOperationID(operationID)
participantSet[address] = entry
}
}

return nil
}

func (p *ParticipantsProcessor) insertDBTransactionParticipants(ctx context.Context, participantSet map[string]participant) error {
batch := p.participantsQ.NewTransactionParticipantsBatchInsertBuilder()

for _, entry := range participantSet {
for transactionID := range entry.transactionSet {
if err := batch.Add(transactionID, entry.accountID); err != nil {
return errors.Wrap(err, "Could not insert transaction participant in db")
if err := p.opBatch.Add(operationID, p.accountLoader.GetFuture(address)); err != nil {
return err
}
}
}

if err := batch.Exec(ctx, p.session); err != nil {
return errors.Wrap(err, "Could not flush transaction participants to db")
}
return nil
}

func (p *ParticipantsProcessor) insertDBOperationsParticipants(ctx context.Context, participantSet map[string]participant) error {
batch := p.participantsQ.NewOperationParticipantBatchInsertBuilder()
func (p *ParticipantsProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {

for _, entry := range participantSet {
for operationID := range entry.operationSet {
if err := batch.Add(operationID, entry.accountID); err != nil {
return errors.Wrap(err, "could not insert operation participant in db")
}
}
}

if err := batch.Exec(ctx, p.session); err != nil {
return errors.Wrap(err, "could not flush operation participants to db")
}
return nil
}

func (p *ParticipantsProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error) {
err = p.addTransactionParticipants(p.participantSet, p.sequence, transaction)
if err != nil {
if err := p.addTransactionParticipants(lcm.LedgerSequence(), transaction); err != nil {
return err
}

err = p.addOperationsParticipants(p.participantSet, p.sequence, transaction)
if err != nil {
if err := p.addOperationsParticipants(lcm.LedgerSequence(), transaction); err != nil {
return err
}

return nil
}

func (p *ParticipantsProcessor) Commit(ctx context.Context) (err error) {
if len(p.participantSet) > 0 {
if err = p.loadAccountIDs(ctx, p.participantSet); err != nil {
return err
}

if err = p.insertDBTransactionParticipants(ctx, p.participantSet); err != nil {
return err
}

if err = p.insertDBOperationsParticipants(ctx, p.participantSet); err != nil {
return err
}
func (p *ParticipantsProcessor) Commit(ctx context.Context, session db.SessionInterface) error {
if err := p.txBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush transaction participants to db")
}

return err
if err := p.opBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush operation participants to db")
}
return nil
}

func ParticipantsForTransaction(
Expand Down
Loading
Loading