Skip to content

Commit

Permalink
Resolve Merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Nov 2, 2023
1 parent 08b2a3d commit b8efa95
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 89 deletions.
22 changes: 11 additions & 11 deletions services/horizon/internal/actions/claimable_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestGetClaimableBalanceByID(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down Expand Up @@ -156,15 +156,10 @@ func TestGetClaimableBalances(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

<<<<<<< HEAD
q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})
defer q.SessionInterface.Rollback()
=======
tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
>>>>>>> c7bd7ac1 (Fix linter errors)

entriesMeta := []struct {
id xdr.Hash
Expand Down Expand Up @@ -206,6 +201,7 @@ func TestGetClaimableBalances(t *testing.T) {
}

balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder()

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()

for _, cBalance := range hCBs {
Expand Down Expand Up @@ -301,6 +297,7 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

// new claimable balances are ingested, they should appear in the next pages
balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder()
claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder()

Expand All @@ -324,13 +321,12 @@ func TestGetClaimableBalances(t *testing.T) {
},
}

hCBs = nil
for _, e := range entriesMeta {
entry := buildClaimableBalance(tt, e.id, e.accountID, e.ledger, e.asset)
hCBs = append(hCBs, entry)
}

for _, cBalance := range hCBs {
for _, cBalance := range hCBs[4:] {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, claimant := range cBalance.Claimants {
Expand Down Expand Up @@ -370,7 +366,7 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.Len(response, 2)

// response should be the first 2 elements of entries
for i, entry := range hCBs {
for i, entry := range hCBs[4:] {
tt.Assert.Equal(entry.BalanceID, response[i].(protocol.ClaimableBalance).BalanceID)
}

Expand Down Expand Up @@ -401,7 +397,9 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 2)

tt.Assert.Equal(hCBs[1].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[5].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

tt.Assert.Equal(hCBs[4].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand All @@ -417,6 +415,8 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 1)

tt.Assert.Equal(hCBs[0].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

// filter by asset
response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalan
return i.builder.RowStruct(claimableBalance)
}

// Exec inserts claimable balance rows to the database
// Exec writes the batch of claimable balances to the database.
func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/stellar/go/xdr"
)

// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the
// history_transactions table
// ClaimableBalanceClaimantBatchInsertBuilder is used to insert claimants into the
// claimable_balance_claimants table
type ClaimableBalanceClaimantBatchInsertBuilder interface {
Add(claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context, session db.SessionInterface) error
Expand All @@ -31,12 +31,12 @@ func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClai
}
}

// Add adds a new claimant for a claimable Balance to the batch
// Add adds a new claimant to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(claimableBalanceClaimant)
}

// Exec flushes the entire batch into the database
// Exec writes the batch of claimants to the database.
func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}
Expand Down
2 changes: 0 additions & 2 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ func (c Claimants) Value() (driver.Value, error) {
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
// By doing so, the data will be written as a string rather than hex encoded bytes.
// According to this https://www.postgresql.org/docs/current/datatype-json.html
// this may even improve write performance due to reduced conversion overhead.
val, err := json.Marshal(c)
return string(val), err
}
Expand Down
16 changes: 6 additions & 10 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package history
import (
"database/sql"
"fmt"
"github.com/guregu/null"

Check failure on line 6 in services/horizon/internal/db2/history/claimable_balances_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)

Check failure on line 6 in services/horizon/internal/db2/history/claimable_balances_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)
"testing"

"github.com/guregu/null"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/xdr"
Expand All @@ -16,7 +16,7 @@ func TestRemoveClaimableBalance(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}
tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestRemoveClaimableBalanceClaimants(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}
tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down Expand Up @@ -409,9 +409,7 @@ func TestFindClaimableBalance(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})
defer q.SessionInterface.Rollback()
tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down Expand Up @@ -460,9 +458,7 @@ func TestGetClaimableBalancesByID(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})
defer q.SessionInterface.Rollback()
tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down
4 changes: 0 additions & 4 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/stellar/go/support/db"
"time"

"github.com/stellar/go/ingest"
Expand Down Expand Up @@ -407,10 +406,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (

groupChangeProcessors := buildChangeProcessor(
s.historyQ,
<<<<<<< HEAD
=======
s.session,
>>>>>>> 8f1836b1 (Use FastBatchInsertBuilder to insert to insert into claimable_balances and claimable_balance_claimants tables)
&changeStatsProcessor,
ledgerSource,
ledger.LedgerSequence(),
Expand Down
6 changes: 1 addition & 5 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
processor := buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, ledgerSource, 123, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -221,11 +221,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

<<<<<<< HEAD
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
=======
processor = buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, historyArchiveSource, 456, "")
>>>>>>> 8f1836b1 (Use FastBatchInsertBuilder to insert to insert into claimable_balances and claimable_balance_claimants tables)
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
}
}

if err := p.InsertClaimableBalanceAndClaimants(ctx, cbsToInsert); err != nil {
return errors.Wrap(err, "error inserting claimable balance")
if len(cbsToInsert) > 0 {
if err := p.insertClaimableBalancesAndClaimants(ctx, cbsToInsert); err != nil {
return errors.Wrap(err, "error inserting claimable balance")
}
}

if len(cbIDsToDelete) > 0 {
Expand All @@ -108,19 +110,18 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
return nil
}

func (p *ClaimableBalancesChangeProcessor) InsertClaimableBalanceAndClaimants(ctx context.Context, claimableBalances []history.ClaimableBalance) error {
if len(claimableBalances) == 0 {
return nil
}

func (p *ClaimableBalancesChangeProcessor) insertClaimableBalancesAndClaimants(ctx context.Context,
claimableBalances []history.ClaimableBalance) error {
defer p.claimantsInsertBuilder.Reset()
defer p.claimableBalanceInsertBuilder.Reset()

for _, cb := range claimableBalances {

// Add claimable balance
if err := p.claimableBalanceInsertBuilder.Add(cb); err != nil {
return errors.Wrap(err, "error executing insert")
return errors.Wrap(err, "error adding to ClaimableBalanceBatchInsertBuilder")
}

// Add claimants
for _, claimant := range cb.Claimants {
claimant := history.ClaimableBalanceClaimant{
Expand All @@ -130,19 +131,19 @@ func (p *ClaimableBalancesChangeProcessor) InsertClaimableBalanceAndClaimants(ct
}

if err := p.claimantsInsertBuilder.Add(claimant); err != nil {
return errors.Wrap(err, "error adding to claimantsInsertBuilder")
return errors.Wrap(err, "error adding to ClaimableBalanceClaimantBatchInsertBuilder")
}
}
}

err := p.claimantsInsertBuilder.Exec(ctx, p.session)
if err != nil {
return errors.Wrap(err, "error executing claimableBalanceInsertBuilder")
return errors.Wrap(err, "error executing ClaimableBalanceClaimantBatchInsertBuilder")
}

err = p.claimableBalanceInsertBuilder.Exec(ctx, p.session)
if err != nil {
return errors.Wrap(err, "error executing claimantsInsertBuilder")
return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder")
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/guregu/null"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -183,7 +182,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: nil,
SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
}
Expand All @@ -200,6 +199,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc
Asset: cBalance.Asset,
Amount: cBalance.Amount,
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
).Return(nil).Once()

Expand All @@ -209,42 +209,6 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc
Post: &entry,
})
s.Assert().NoError(err)

// add sponsor
updated := xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &cBalance,
},
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
}

entry.LastModifiedLedgerSeq = entry.LastModifiedLedgerSeq - 1
err = s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeClaimableBalance,
Pre: &entry,
Post: &updated,
})
s.Assert().NoError(err)

// We use LedgerEntryChangesCache so all changes are squashed
s.mockClaimableBalanceBatchInsertBuilder.On(
"Add",
history.ClaimableBalance{
BalanceID: id,
Claimants: []history.Claimant{},
Asset: cBalance.Asset,
Amount: cBalance.Amount,
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
).Return(nil).Once()
}

func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBalance() {
Expand Down
8 changes: 5 additions & 3 deletions services/horizon/internal/ingest/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ func TestStateVerifierLockBusy(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{&db.Session{DB: tt.HorizonDB}}

tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()

checkpointLedger := uint32(63)
changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "")
changeProcessor := buildChangeProcessor(q, q.SessionInterface, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "")

gen := randxdr.NewGenerator()
var changes []xdr.LedgerEntryChange
Expand All @@ -296,6 +296,8 @@ func TestStateVerifierLockBusy(t *testing.T) {
}
tt.Assert.NoError(changeProcessor.Commit(tt.Ctx))

tt.Assert.NoError(q.SessionInterface.Commit())

q.UpdateLastLedgerIngest(tt.Ctx, checkpointLedger)

mockHistoryAdapter := &mockHistoryArchiveAdapter{}
Expand Down Expand Up @@ -329,7 +331,7 @@ func TestStateVerifier(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{&db.Session{DB: tt.HorizonDB}}

tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{}))
tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.SessionInterface.Rollback()
}()
Expand Down

0 comments on commit b8efa95

Please sign in to comment.