Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon/services/ingest: merge ingestion-next to master #5101

Merged
merged 21 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9bc197d
support/db: Add batch insert builder which uses COPY to insert rows (…
tamirms Jul 5, 2023
b451d60
Use FastBatchInsertBuilder to insert ledgers into the history_ledgers…
tamirms Jul 11, 2023
af8161b
services/horizon/internal/db2/history: Use FastBatchInsertBuilder to …
tamirms Jul 18, 2023
e163964
services/horizon/internal/db2/history: Use FastBatchInsertBuilder to …
tamirms Jul 19, 2023
6564385
services/horizon/internal/db2/history: Use FastBatchInsertBuilder for…
tamirms Jul 19, 2023
c645724
services/horizon/internal/db2/history: Use FastBatchInsertBuilder for…
tamirms Jul 24, 2023
88f19e4
services/horizon/internal/ingest/processors: Refactor ledgers, transa…
tamirms Aug 11, 2023
a630fcb
services/horizon/internal/db2/history: Implement account loader and f…
tamirms Aug 14, 2023
a4db2a9
services/horizon/internal/db2/history: Implement loaders for assets, …
tamirms Aug 21, 2023
461e5a1
services/horizon/internal/ingest/processors: Refactor participants pr…
tamirms Aug 23, 2023
21d016f
services/horizon/internal/ingest/processors: Refactor liquidity pools…
tamirms Aug 25, 2023
8775648
services/horizon/internal/ingest/processors: Refactor effects process…
tamirms Aug 31, 2023
98c135c
integrating new loaders and builders into processors (#5083)
sreuland Oct 26, 2023
299d386
Merge remote-tracking branch 'upstream/master' into ingestion-next
sreuland Oct 31, 2023
59cc1f0
update new processor, batch loader to work with latest from master
sreuland Oct 31, 2023
2be340b
#4909: review feedback on err handling in processor
sreuland Oct 31, 2023
769a287
#4909: convert byte[] values to string before sending to fast batch b…
sreuland Nov 1, 2023
1efab1f
fixed dependecnies for verify-range to to get go version dynmaically …
sreuland Nov 1, 2023
4af6d44
Merge pull request #5096 from sreuland/ingestion-next
sreuland Nov 1, 2023
54e0f52
Merge remote-tracking branch 'upstream/master' into ingestion-next
sreuland Nov 1, 2023
0d42a41
Merge pull request #5102 from sreuland/ingestion-next
sreuland Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:
horizon-postgres:
platform: linux/amd64
image: postgres:12-bullseye
image: postgres:postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/docker/verify-range/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ENV STELLAR_CORE_VERSION=${STELLAR_CORE_VERSION:-*}
ENV DEBIAN_FRONTEND=noninteractive

ADD dependencies /
RUN ["chmod", "+x", "dependencies"]
RUN ["chmod", "+x", "/dependencies"]
RUN /dependencies

ADD stellar-core.cfg /
Expand Down
6 changes: 5 additions & 1 deletion services/horizon/internal/action_offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func TestOfferActions_Show(t *testing.T) {
ht.Assert.NoError(err)

ledgerCloseTime := time.Now().Unix()
_, err = q.InsertLedger(ctx, xdr.LedgerHeaderHistoryEntry{
ht.Assert.NoError(q.Begin(ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 100,
ScpValue: xdr.StellarValue{
Expand All @@ -33,6 +35,8 @@ func TestOfferActions_Show(t *testing.T) {
},
}, 0, 0, 0, 0, 0)
ht.Assert.NoError(err)
ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q))
ht.Assert.NoError(q.Commit())

issuer := xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")
nativeAsset := xdr.MustNewNativeAsset()
Expand Down
19 changes: 16 additions & 3 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ func TestAccountInfo(t *testing.T) {
assert.NoError(t, err)

ledgerFourCloseTime := time.Now().Unix()
_, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
assert.NoError(t, q.Begin(tt.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 4,
ScpValue: xdr.StellarValue{
Expand All @@ -237,6 +239,8 @@ func TestAccountInfo(t *testing.T) {
},
}, 0, 0, 0, 0, 0)
assert.NoError(t, err)
assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q))
assert.NoError(t, q.Commit())

account, err := AccountInfo(tt.Ctx, &history.Q{tt.HorizonSession()}, accountID)
tt.Assert.NoError(err)
Expand Down Expand Up @@ -408,7 +412,9 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) {
err := q.UpsertAccounts(tt.Ctx, []history.AccountEntry{account1, account2})
assert.NoError(t, err)
ledgerCloseTime := time.Now().Unix()
_, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
assert.NoError(t, q.Begin(tt.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 1234,
ScpValue: xdr.StellarValue{
Expand All @@ -417,6 +423,8 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) {
},
}, 0, 0, 0, 0, 0)
assert.NoError(t, err)
assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q))
assert.NoError(t, q.Commit())

for _, row := range accountSigners {
_, err = q.CreateAccountSigner(tt.Ctx, row.Account, row.Signer, row.Weight, nil)
Expand Down Expand Up @@ -511,7 +519,9 @@ func TestGetAccountsHandlerPageResultsByLiquidityPool(t *testing.T) {
assert.NoError(t, err)

ledgerCloseTime := time.Now().Unix()
_, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
assert.NoError(t, q.Begin(tt.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 1234,
ScpValue: xdr.StellarValue{
Expand All @@ -520,6 +530,9 @@ func TestGetAccountsHandlerPageResultsByLiquidityPool(t *testing.T) {
},
}, 0, 0, 0, 0, 0)
assert.NoError(t, err)
assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q))
assert.NoError(t, q.Commit())

var assetType, code, issuer string
usd.MustExtract(&assetType, &code, &issuer)
params := map[string]string{
Expand Down
16 changes: 12 additions & 4 deletions services/horizon/internal/actions/offer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ func TestGetOfferByIDHandler(t *testing.T) {
handler := GetOfferByID{}

ledgerCloseTime := time.Now().Unix()
_, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
assert.NoError(t, q.Begin(tt.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err := ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 3,
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(ledgerCloseTime),
},
},
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)
assert.NoError(t, err)
assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q))
assert.NoError(t, q.Commit())

err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer})
tt.Assert.NoError(err)
Expand Down Expand Up @@ -186,15 +190,19 @@ func TestGetOffersHandler(t *testing.T) {
handler := GetOffersHandler{}

ledgerCloseTime := time.Now().Unix()
_, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
assert.NoError(t, q.Begin(tt.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err := ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 3,
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(ledgerCloseTime),
},
},
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)
assert.NoError(t, err)
assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q))
assert.NoError(t, q.Commit())

err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer})
tt.Assert.NoError(err)
Expand Down
30 changes: 19 additions & 11 deletions services/horizon/internal/actions/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ func TestInvokeHostFnDetailsInPaymentOperations(t *testing.T) {
opID1 := toid.New(sequence, txIndex, 1).ToInt64()

ledgerCloseTime := time.Now().Unix()
_, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(sequence),
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(ledgerCloseTime),
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err := ledgerBatch.Add(
xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(sequence),
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(ledgerCloseTime),
},
},
},
}, 1, 0, 1, 0, 0)
}, 1, 0, 1, 0, 0)
tt.Assert.NoError(err)
tt.Assert.NoError(q.Begin(tt.Ctx))
tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q))

transactionBuilder := q.NewTransactionBatchInsertBuilder(1)
transactionBuilder := q.NewTransactionBatchInsertBuilder()
firstTransaction := buildLedgerTransaction(tt.T, testTransaction{
index: uint32(txIndex),
envelopeXDR: "AAAAACiSTRmpH6bHC6Ekna5e82oiGY5vKDEEUgkq9CB//t+rAAAAyAEXUhsAADDRAAAAAAAAAAAAAAABAAAAAAAAAAsBF1IbAABX4QAAAAAAAAAA",
Expand All @@ -55,11 +59,13 @@ func TestInvokeHostFnDetailsInPaymentOperations(t *testing.T) {
metaXDR: "AAAAAQAAAAAAAAAA",
hash: "19aaa18db88605aedec04659fb45e06f240b022eb2d429e05133e4d53cd945ba",
})
err = transactionBuilder.Add(tt.Ctx, firstTransaction, uint32(sequence))
err = transactionBuilder.Add(firstTransaction, uint32(sequence))
tt.Assert.NoError(err)
tt.Assert.NoError(transactionBuilder.Exec(tt.Ctx, q))

operationBuilder := q.NewOperationBatchInsertBuilder()

operationBuilder := q.NewOperationBatchInsertBuilder(1)
err = operationBuilder.Add(tt.Ctx,
err = operationBuilder.Add(
opID1,
txID,
1,
Expand Down Expand Up @@ -118,6 +124,8 @@ func TestInvokeHostFnDetailsInPaymentOperations(t *testing.T) {
null.String{},
true)
tt.Assert.NoError(err)
tt.Assert.NoError(operationBuilder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

records, err := handler.GetResourcePage(
httptest.NewRecorder(),
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/actions/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func checkOuterHashResponse(
}

func TestFeeBumpTransactionPage(t *testing.T) {

tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/actions_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ func TestAccountActions_InvalidID(t *testing.T) {
ht.Assert.NoError(err)
err = q.UpdateIngestVersion(ht.Ctx, ingest.CurrentVersion)
ht.Assert.NoError(err)
_, err = q.InsertLedger(ht.Ctx, xdr.LedgerHeaderHistoryEntry{

ht.Assert.NoError(q.Begin(ht.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 100,
},
}, 0, 0, 0, 0, 0)
ht.Assert.NoError(err)
ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q))
ht.Assert.NoError(q.Commit())

// existing account
w := ht.Get(
Expand Down
6 changes: 5 additions & 1 deletion services/horizon/internal/actions_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ func TestDataActions_Show(t *testing.T) {
ht.Assert.NoError(err)
err = q.UpdateIngestVersion(ht.Ctx, ingest.CurrentVersion)
ht.Assert.NoError(err)
_, err = q.InsertLedger(ht.Ctx, xdr.LedgerHeaderHistoryEntry{
ht.Assert.NoError(q.Begin(ht.Ctx))
ledgerBatch := q.NewLedgerBatchInsertBuilder()
err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 100,
},
}, 0, 0, 0, 0, 0)
ht.Assert.NoError(err)
ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q))
ht.Assert.NoError(q.Commit())

err = q.UpsertAccountData(ht.Ctx, []history.Data{data1, data2})
assert.NoError(t, err)
Expand Down
16 changes: 13 additions & 3 deletions services/horizon/internal/actions_trade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,11 @@ func IngestTestTrade(
return err
}

batch := q.NewTradeBatchInsertBuilder(0)
batch.Add(ctx, history.InsertTrade{
if err = q.Begin(ctx); err != nil {
return err
}
batch := q.NewTradeBatchInsertBuilder()
err = batch.Add(history.InsertTrade{
HistoryOperationID: opCounter,
Order: 0,
CounterAssetID: assets[assetBought.String()].ID,
Expand All @@ -839,7 +842,10 @@ func IngestTestTrade(

Type: history.OrderbookTradeType,
})
err = batch.Exec(ctx)
if err != nil {
return err
}
err = batch.Exec(ctx, q)
if err != nil {
return err
}
Expand All @@ -849,6 +855,10 @@ func IngestTestTrade(
return err
}

if err := q.Commit(); err != nil {
return err
}

return nil
}

Expand Down
Loading
Loading