Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fast_copy_ledger_…
Browse files Browse the repository at this point in the history
…range
  • Loading branch information
sreuland committed Nov 28, 2023
2 parents d3a3926 + 69266de commit 80c9747
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 234 deletions.
3 changes: 2 additions & 1 deletion services/horizon/docker/docker-compose.integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ services:
# Note: Please keep the image pinned to an immutable tag matching the Captive Core version.
# This avoid implicit updates which break compatibility between
# the Core container and captive core.
image: ${CORE_IMAGE:-chowbao/stellar-core:19.11.1-1357.e38ee728d.focal-sorobanP10}
image: ${CORE_IMAGE:-stellar/stellar-core:19.13.1-1481.3acf6dd26.focal}

depends_on:
- core-postgres
restart: on-failure
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:
horizon-postgres:
platform: linux/amd64
image: postgres:postgres:12-bullseye
image: postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

type AccountDataBatchInsertBuilder interface {
Add(data Data) error
Exec(ctx context.Context) error
}

type accountDataBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

func (q *Q) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder {
return &accountDataBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts_data",
}
}

// Add adds a new account data to the batch
func (i *accountDataBatchInsertBuilder) Add(data Data) error {
ledgerKey, err := accountDataKeyToString(AccountDataKey{
AccountID: data.AccountID,
DataName: data.Name,
})
if err != nil {
return err
}
return i.builder.Row(map[string]interface{}{
"ledger_key": ledgerKey,
"account_id": data.AccountID,
"name": data.Name,
"value": data.Value,
"last_modified_ledger": data.LastModifiedLedger,
"sponsor": data.Sponsor,
})
}

// Exec writes the batch of account data to the database.
func (i *accountDataBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
4 changes: 3 additions & 1 deletion services/horizon/internal/db2/history/account_data_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func (t *AccountDataValue) Scan(src interface{}) error {

// Value implements driver.Valuer
func (value AccountDataValue) Value() (driver.Value, error) {
return driver.Value([]uint8(base64.StdEncoding.EncodeToString(value))), nil
// Return string to bypass buggy encoding in pq driver for []byte.
// More info https://github.com/stellar/go/issues/5086#issuecomment-1773215436)
return driver.Value(base64.StdEncoding.EncodeToString(value)), nil
}

func (value AccountDataValue) Base64() string {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

// AccountsBatchInsertBuilder is used to insert accounts into the accounts table
type AccountsBatchInsertBuilder interface {
Add(account AccountEntry) error
Exec(ctx context.Context) error
}

// AccountsBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type accountsBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewAccountsBatchInsertBuilder constructs a new AccountsBatchInsertBuilder instance
func (q *Q) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
return &accountsBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts",
}
}

// Add adds a new account to the batch
func (i *accountsBatchInsertBuilder) Add(account AccountEntry) error {
return i.builder.RowStruct(account)
}

// Exec writes the batch of accounts to the database.
func (i *accountsBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
3 changes: 3 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ type QAccounts interface {
GetAccountsByIDs(ctx context.Context, ids []string) ([]AccountEntry, error)
UpsertAccounts(ctx context.Context, accounts []AccountEntry) error
RemoveAccounts(ctx context.Context, accountIDs []string) (int64, error)
NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder
}

// AccountSigner is a row of data from the `accounts_signers` table
Expand Down Expand Up @@ -356,6 +357,7 @@ type QData interface {
GetAccountDataByKeys(ctx context.Context, keys []AccountDataKey) ([]Data, error)
UpsertAccountData(ctx context.Context, data []Data) error
RemoveAccountData(ctx context.Context, keys []AccountDataKey) (int64, error)
NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder
}

// Asset is a row of data from the `history_assets` table
Expand Down Expand Up @@ -846,6 +848,7 @@ type QTrustLines interface {
GetTrustLinesByKeys(ctx context.Context, ledgerKeys []string) ([]TrustLine, error)
UpsertTrustLines(ctx context.Context, trustlines []TrustLine) error
RemoveTrustLines(ctx context.Context, ledgerKeys []string) (int64, error)
NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder
}

func (q *Q) NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockAccountDataBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountDataBatchInsertBuilder) Add(data Data) error {
a := m.Called(data)
return a.Error(0)
}

func (m *MockAccountDataBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockAccountsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountsBatchInsertBuilder) Add(account AccountEntry) error {
a := m.Called(account)
return a.Error(0)
}

func (m *MockAccountsBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ func (m *MockQAccounts) RemoveAccounts(ctx context.Context, accountIDs []string)
a := m.Called(ctx, accountIDs)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQAccounts) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountsBatchInsertBuilder)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func (m *MockQData) RemoveAccountData(ctx context.Context, keys []AccountDataKey
a := m.Called(ctx, keys)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQData) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountDataBatchInsertBuilder)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_trust_lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ func (m *MockQTrustLines) RemoveTrustLines(ctx context.Context, ledgerKeys []str
a := m.Called(ctx, ledgerKeys)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQTrustLines) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder {
a := m.Called()
return a.Get(0).(TrustLinesBatchInsertBuilder)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockTrustLinesBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTrustLinesBatchInsertBuilder) Add(line TrustLine) error {
a := m.Called(line)
return a.Error(0)
}

func (m *MockTrustLinesBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

// TrustLinesBatchInsertBuilder is used to insert trustlines into the trust_lines table
type TrustLinesBatchInsertBuilder interface {
Add(line TrustLine) error
Exec(ctx context.Context) error
}

// trustLinesBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type trustLinesBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewTrustLinesBatchInsertBuilder constructs a new TrustLinesBatchInsertBuilder instance
func (q *Q) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder {
return &trustLinesBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "trust_lines",
}
}

// Add adds a new trustline to the batch
func (i *trustLinesBatchInsertBuilder) Add(line TrustLine) error {
return i.builder.RowStruct(line)
}

// Exec writes the batch of trust lines to the database.
func (i *trustLinesBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
64 changes: 43 additions & 21 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

q := &mockDBQ{}

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -49,6 +38,16 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
Sponsor: null.String{},
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -94,16 +93,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
nil,
).Once()

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -114,6 +103,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
Weight: 1,
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -578,6 +576,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockAccountsBatchInsertBuilder := &history.MockAccountsBatchInsertBuilder{}
if mockExec {
mockAccountsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQAccounts.On("NewAccountsBatchInsertBuilder").
Return(mockAccountsBatchInsertBuilder).Twice()

mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{}
if mockExec {
mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx).
Expand Down Expand Up @@ -607,10 +612,27 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQOffers.On("NewOffersBatchInsertBuilder").
Return(mockOfferBatchInsertBuilder).Twice()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
if mockExec {
mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
if mockExec {
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").
Return(mockTrustLinesBatchInsertBuilder)

return []interface{}{mockAccountSignersBatchInsertBuilder,
mockAccountsBatchInsertBuilder,
mockClaimableBalanceBatchInsertBuilder,
mockClaimableBalanceClaimantBatchInsertBuilder,
mockLiquidityPoolBatchInsertBuilder,
mockOfferBatchInsertBuilder,
mockAccountDataBatchInsertBuilder,
mockTrustLinesBatchInsertBuilder,
}
}
Loading

0 comments on commit 80c9747

Please sign in to comment.