Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the latest version of the Stellar ingestion library #122

Merged
merged 5 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os"

"github.com/spf13/cobra"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/xdr"
)

var accountsCmd = &cobra.Command{
Expand Down
7 changes: 4 additions & 3 deletions cmd/export_ledgers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var executableName = "stellar-etl"
var archiveURL = "http://history.stellar.org/prd/core-live/core_live_001"
var latestLedger = getLastSeqNum()
var update = flag.Bool("update", false, "update the golden files of this test")
var backend, _ = utils.CreateBackend()

type cliTest struct {
name string
Expand All @@ -42,7 +41,6 @@ func TestMain(m *testing.M) {

flag.Parse()
exitCode := m.Run()
backend.Close()
os.Exit(exitCode)
}

Expand Down Expand Up @@ -148,7 +146,10 @@ func removeCoreLogging(loggerOutput string) string {
}

func getLastSeqNum() uint32 {
num, _ := backend.GetLatestLedgerSequence()
num, err := utils.GetLatestLedgerSequence()
if err != nil {
panic(err)
}
return num
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/export_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os"

"github.com/spf13/cobra"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/xdr"
)

// offersCmd represents the offers command
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_orderbooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"path/filepath"

"github.com/spf13/cobra"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/xdr"
)

// exportOrderbooksCmd represents the exportOrderbooks command
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_trustlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os"

"github.com/spf13/cobra"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/xdr"
)

// trustlinesCmd represents the trustlines command
Expand Down
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"fmt"
"os"

"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"

homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/viper"

"github.com/stellar/go/support/log"
)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 // indirect
github.com/gdexlab/go-render v1.0.1
github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a
github.com/gdexlab/go-render v1.0.1 // indirect
github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a // indirect
github.com/go-chi/chi v4.1.2+incompatible // indirect
github.com/go-errors/errors v1.1.1 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
Expand Down Expand Up @@ -60,7 +60,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9
github.com/stellar/go v0.0.0-20210219143447-0e85041a345b
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/ugorji/go v1.1.4 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,18 @@ github.com/stellar/go v0.0.0-20201217225215-bb1a017ef327 h1:k0aXRJLOYUhiclG4xdDY
github.com/stellar/go v0.0.0-20201217225215-bb1a017ef327/go.mod h1:u39t8VPN26U8w6UaGoVqWN4e9rdTVNCji/Q0HlZqIbY=
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9 h1:frpZn2I+F1M9kpBGX1UgMPmaad1muqa0P14uVvHcu6o=
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9/go.mod h1:u39t8VPN26U8w6UaGoVqWN4e9rdTVNCji/Q0HlZqIbY=
github.com/stellar/go v0.0.0-20210119090801-9791cbfaaa0b h1:xLW3PNG4pI6ReDz0m0cMMLeGQp+ke/gNX7dTmXaEOzU=
github.com/stellar/go v0.0.0-20210119090801-9791cbfaaa0b/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210208222255-f071f3b0df9a h1:e83OUc9ndYZMpCybfNsffruzJa5aHPahmqNVjk4ppu8=
github.com/stellar/go v0.0.0-20210208222255-f071f3b0df9a/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210210172739-2df91a0a5d4f h1:ESi1imNB0dljhgn0/NM9dQqYzUNbXPksrBhSD6c5N7Y=
github.com/stellar/go v0.0.0-20210210172739-2df91a0a5d4f/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210216193041-be0aebd7aca8 h1:eh3kZgq981wd9qIKumsI1jAtEkt4P62VESccrBB2zRA=
github.com/stellar/go v0.0.0-20210216193041-be0aebd7aca8/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210217212725-e9c8064a905d h1:/kFJdDhyszJOFV1w74oeQtlcnuJQ1wHkMxp0z+9+uWE=
github.com/stellar/go v0.0.0-20210217212725-e9c8064a905d/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210219143447-0e85041a345b h1:Ck5BVI8H1HQIk0vR+WxyewLB4pc3yn8sU3+9OurasIQ=
github.com/stellar/go v0.0.0-20210219143447-0e85041a345b/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go-xdr v0.0.0-20200331223602-71a1e6d555f2 h1:K9H+A+eWe8ZlnpNha+pXbEK+jtIluQp/2dKxkK8k7OE=
github.com/stellar/go-xdr v0.0.0-20200331223602-71a1e6d555f2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps=
github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a h1:GnM0ArRp7EDbaTiFhSp/CLgyk2cacXxdUklqJmdJs1Q=
Expand Down
25 changes: 8 additions & 17 deletions internal/input/assets.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
package input

import (
ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/go/xdr"
"io"

"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// GetPaymentOperations returns a slice of payment operations that can include new assets from the ledgers in the provided range (inclusive on both ends)
func GetPaymentOperations(start, end uint32, limit int64) ([]OperationTransformInput, error) {
backend, err := utils.CreateBackend()
if err != nil {
return []OperationTransformInput{}, err
}

defer backend.Close()

latestNum, err := backend.GetLatestLedgerSequence()
if err != nil {
return []OperationTransformInput{}, err
}

err = validateLedgerRange(start, end, latestNum)
backend, err := utils.CreateBackend(start, end)
if err != nil {
return []OperationTransformInput{}, err
}

opSlice := []OperationTransformInput{}
for seq := start; seq <= end; seq++ {
txReader, err := ingestio.NewLedgerTransactionReader(backend, publicPassword, seq)
txReader, err := ingest.NewLedgerTransactionReader(backend, publicPassword, seq)
if err != nil {
return []OperationTransformInput{}, err
}

for int64(len(opSlice)) < limit || limit < 0 {
tx, err := txReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
break
}

Expand Down
43 changes: 18 additions & 25 deletions internal/input/bucketlist_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,55 @@ package input

import (
"context"

"github.com/stellar/stellar-etl/internal/utils"
"io"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/adapters"
ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

var archiveStellarURL = "http://history.stellar.org/prd/core-live/core_live_001"
"github.com/stellar/stellar-etl/internal/utils"
)

// GetEntriesFromGenesis returns a slice of ledger entries of the specified type for the ledgers starting from the genesis ledger and ending at end (inclusive)
func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType) ([]ingestio.Change, error) {
archive, err := historyarchive.Connect(
archiveStellarURL,
historyarchive.ConnectOptions{Context: context.Background()},
)
func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) {
archive, err := utils.CreateHistoryArchiveClient()
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

historyAdapter := adapters.MakeHistoryArchiveAdapter(archive)
latestNum, err := historyAdapter.GetLatestLedgerSequence()
latestNum, err := utils.GetLatestLedgerSequence()
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

err = validateLedgerRange(1, end, latestNum)
if err != nil {
return []ingestio.Change{}, err
if err = utils.ValidateLedgerRange(1, end, latestNum); err != nil {
return []ingest.Change{}, err
}

checkpointSeq, err := utils.GetCheckpointNum(end, latestNum)
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

return readBucketList(archive, checkpointSeq, entryType)
}

func readBucketList(archive *historyarchive.Archive, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingestio.Change, error) {
changeReader, err := ingestio.MakeSingleLedgerStateReader(context.Background(), archive, checkpointSeq)
func readBucketList(archive historyarchive.ArchiveInterface, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) {
changeReader, err := ingest.NewCheckpointChangeReader(context.Background(), archive, checkpointSeq)
defer changeReader.Close()
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

entrySlice := []ingestio.Change{}
entrySlice := []ingest.Change{}
for {
change, err := changeReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
break
}

if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

if change.Type == entryType {
Expand Down
45 changes: 15 additions & 30 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,37 @@ package input

import (
"fmt"
"io"
"math"

ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

const password = network.PublicNetworkPassphrase

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes []ingestio.Change
Changes []ingest.Change
BatchStart uint32
BatchEnd uint32
Type xdr.LedgerEntryType
}

func getLatestLedgerNumber() (uint32, error) {
backend, err := utils.CreateBackend()
if err != nil {
return 0, err
}

defer backend.Close()

latestNum, err := backend.GetLatestLedgerSequence()
if err != nil {
return 0, err
}

return latestNum, nil
}

// PrepareCaptiveCore creates a new captive core instance and prepares it with the given range. The range is unbounded when end = 0, and is bounded and validated otherwise
func PrepareCaptiveCore(execPath, configPath string, start, end uint32) (*ledgerbackend.CaptiveStellarCore, error) {
captiveBackend, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: execPath,
ConfigAppendPath: configPath,
NetworkPassphrase: password,
HistoryArchiveURLs: []string{archiveStellarURL},
HistoryArchiveURLs: utils.ArchiveURLs,
},
)
if err != nil {
Expand All @@ -57,13 +43,12 @@ func PrepareCaptiveCore(execPath, configPath string, start, end uint32) (*ledger

if end != 0 {
ledgerRange = ledgerbackend.BoundedRange(start, end)
latest, err := getLatestLedgerNumber()
latest, err := utils.GetLatestLedgerSequence()
if err != nil {
return &ledgerbackend.CaptiveStellarCore{}, err
}

err = validateLedgerRange(start, end, latest)
if err != nil {
if err = utils.ValidateLedgerRange(start, end, latest); err != nil {
return &ledgerbackend.CaptiveStellarCore{}, err
}
}
Expand Down Expand Up @@ -112,10 +97,10 @@ func closeChannels(accChannel, offChannel, trustChannel chan ChangeBatch) {
}
}

func addLedgerChangesToCache(changeReader *ingestio.LedgerChangeReader, accCache, offCache, trustCache *ingestio.LedgerEntryChangeCache) error {
func addLedgerChangesToCache(changeReader *ingest.LedgerChangeReader, accCache, offCache, trustCache *ingest.ChangeCompactor) error {
for {
change, err := changeReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
return nil
}

Expand Down Expand Up @@ -147,9 +132,9 @@ func addLedgerChangesToCache(changeReader *ingestio.LedgerChangeReader, accCache

// exportBatch gets the changes from the ledgers in the range [batchStart, batchEnd), compacts them, and sends them to the proper channels
func exportBatch(batchStart, batchEnd uint32, core *ledgerbackend.CaptiveStellarCore, accChannel, offChannel, trustChannel chan ChangeBatch, logger *log.Entry) {
accChanges := ingestio.NewLedgerEntryChangeCache()
offChanges := ingestio.NewLedgerEntryChangeCache()
trustChanges := ingestio.NewLedgerEntryChangeCache()
accChanges := ingest.NewChangeCompactor()
offChanges := ingest.NewChangeCompactor()
trustChanges := ingest.NewChangeCompactor()
for seq := batchStart; seq < batchEnd; {
latestLedger, err := core.GetLatestLedgerSequence()
if err != nil {
Expand All @@ -159,7 +144,7 @@ func exportBatch(batchStart, batchEnd uint32, core *ledgerbackend.CaptiveStellar
// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented and we try again on the next iteration of the loop
if seq <= latestLedger {
changeReader, err := ingestio.NewLedgerChangeReader(core, password, seq)
changeReader, err := ingest.NewLedgerChangeReader(core, password, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/stretchr/testify/assert"

ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -114,8 +114,8 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entry xdr.LedgerEntry) ChangeBatch {
changes := []ingestio.Change{
ingestio.Change{Type: entry.Data.Type, Post: &entry},
changes := []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
}
return ChangeBatch{
Changes: changes,
Expand Down
Loading