Skip to content

Commit

Permalink
exp/services/ledgerexporter: refactor ledgerexporter cli into sub-com…
Browse files Browse the repository at this point in the history
…mands (#5335)

#hubble-271: create append and scan-and-fill sub-commands, allow captive core config override
  • Loading branch information
sreuland authored Jun 13, 2024
1 parent a9391b0 commit 4579697
Show file tree
Hide file tree
Showing 45 changed files with 874 additions and 361 deletions.
51 changes: 31 additions & 20 deletions exp/services/ledgerexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,56 @@ type LedgerCloseMetaBatch struct {

### Command Line Options

#### Bounded Mode:
Exports a specific range of ledgers, defined by --start and --end.
#### Scan and Fill Mode:
Exports a specific range of ledgers, defined by --start and --end. Will only export to remote datastore if data is absent.
```bash
ledgerexporter --start <start_ledger> --end <end_ledger> --config-file <config_file_path>
ledgerexporter scan-and-fill --start <start_ledger> --end <end_ledger> --config-file <config_file_path>
```

#### Unbounded Mode:
Exports ledgers continuously starting from --start. In this mode, the end ledger is either not provided or set to 0.
#### Append Mode:
Exports ledgers initially searching from --start, looking for the next absent ledger sequence number proceeding --start on the data store. If abscence is detected, the export range is narrowed to `--start <absent_ledger_sequence>`.
This feature requires ledgers to be present on the remote data store for some (possibly empty) prefix of the requested range and then absent for the (possibly empty) remainder.

In this mode, the --end ledger can be provided to stop the process once export has reached that ledger, or if absent or 0 it will result in continous exporting of new ledgers emitted from the network.

It’s guaranteed that ledgers exported during `append` mode from `start` and up to the last logged ledger file `Uploaded {ledger file name}` were contiguous, meaning all ledgers within that range were exported to the data lake with no gaps or missing ledgers in between.
```bash
ledgerexporter --start <start_ledger> --config-file <config_file_path>
ledgerexporter append --start <start_ledger> --config-file <config_file_path>
```

#### Resumability:
Exporting a ledger range can be optimized further by enabling resumability if the remote data store supports it.
### Configuration (toml):
The `stellar_core_config` supports two ways for configuring captive core:
- use prebuilt captive core config toml, archive urls, and passphrase based on `stellar_core_config.network = testnet|pubnet`.
- manually set the the captive core confg by supplying these core parameters which will override any defaults when `stellar_core_config.network` is present also:
`stellar_core_config.captive_core_toml_path`
`stellar_core_config.history_archive_urls`
`stellar_core_config.network_passphrase`

By default, resumability is disabled, `--resume false`
Ensure you have stellar-core installed and set `stellar_core_config.stellar_core_binary_path` to it's path on o/s.

When enabled, `--resume true`, ledgerexporter will search the remote data store within the requested range, looking for the oldest absent ledger sequence number within range. If abscence is detected, the export range is narrowed to `--start <absent_ledger_sequence>`.
This feature requires all ledgers to be present on the remote data store for some (possibly empty) prefix of the requested range and then absent for the (possibly empty) remainder.

### Configuration (toml):
Enable web service that will be bound to localhost post and publishes metrics by including `admin_port = {port}`

An example config, demonstrating preconfigured captive core settings and gcs data store config.
```toml
network = "testnet" # Options: `testnet` or `pubnet`
admin_port = 6061

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "your-bucket-name/<optional_subpaths>"
destination_bucket_path = "your-bucket-name/<optional_subpath1>/<optional_subpath2>/"

[exporter_config]
[datastore_config.schema]
ledgers_per_file = 64
files_per_partition = 10
```

#### Stellar-core configuration:
- The exporter automatically configures stellar-core based on the network specified in the config.
- Ensure you have stellar-core installed and accessible in your system's $PATH.
[stellar_core_config]
network = "testnet"
stellar_core_binary_path = "/my/path/to/stellar-core"
captive_core_toml_path = "my-captive-core.cfg"
history_archive_urls = ["http://testarchiveurl1", "http://testarchiveurl2"]
network_passphrase = "test"
```

### Exported Files

Expand Down
11 changes: 5 additions & 6 deletions exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
network = "testnet"

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "exporter-test/ledgers"
destination_bucket_path = "exporter-test/ledgers/testnet"

[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000
[datastore_config.schema]
ledgers_per_file = 1
files_per_partition = 64000

[stellar_core_config]
network = "testnet"
stellar_core_binary_path = "/usr/local/bin/stellar-core"

10 changes: 6 additions & 4 deletions exp/services/ledgerexporter/docker/start
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ files_per_partition="${FILES_PER_PARTITION:-64000}"

# Generate TOML configuration
cat <<EOF > config.toml
network = "${NETWORK}"
[datastore_config]
type = "GCS"
[datastore_config.params]
destination_bucket_path = "${ARCHIVE_TARGET}"
destination_bucket_path = "${ARCHIVE_TARGET}/${NETWORK}"
[exporter_config]
[datastore_config.schema]
ledgers_per_file = $ledgers_per_file
files_per_partition = $files_per_partition
[stellar_core_config]
network = "${NETWORK}"
EOF

# Check if START or END variables are set
if [[ -n "$START" || -n "$END" ]]; then
echo "START: $START END: $END"
/usr/bin/ledgerexporter --config-file config.toml --start $START --end $END
/usr/bin/ledgerexporter scan-and-fill --config-file config.toml --start $START --end $END
else
echo "Error: No ledger range provided."
exit 1
Expand Down
50 changes: 30 additions & 20 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -72,7 +73,7 @@ type InvalidDataStoreError struct {
func (m InvalidDataStoreError) Error() string {
return fmt.Sprintf("The remote data store has inconsistent data, "+
"a resumable starting ledger of %v was identified, "+
"but that is not aligned to expected ledgers-per-file of %v. use '--resume false' to bypass",
"but that is not aligned to expected ledgers-per-file of %v. use 'scan-and-fill' sub-command to bypass",
m.LedgerSequence, m.LedgersPerFile)
}

Expand All @@ -82,17 +83,16 @@ type App struct {
dataStore datastore.DataStore
exportManager *ExportManager
uploader Uploader
flags Flags
adminServer *http.Server
}

func NewApp(flags Flags) *App {
func NewApp() *App {
logger.SetLevel(log.DebugLevel)
app := &App{flags: flags}
app := &App{}
return app
}

func (a *App) init(ctx context.Context) error {
func (a *App) init(ctx context.Context, runtimeSettings RuntimeSettings) error {
var err error
var archive historyarchive.ArchiveInterface

Expand All @@ -104,20 +104,22 @@ func (a *App) init(ctx context.Context) error {
collectors.NewGoCollector(),
)

if a.config, err = NewConfig(a.flags); err != nil {
if a.config, err = NewConfig(runtimeSettings); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
if archive, err = a.config.GenerateHistoryArchive(ctx); err != nil {
return err
}
if err = a.config.ValidateAndSetLedgerRange(ctx, archive); err != nil {
return err
}
a.config.ValidateAndSetLedgerRange(ctx, archive)

if a.dataStore, err = datastore.NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil {
if a.dataStore, err = datastore.NewDataStore(ctx, a.config.DataStoreConfig); err != nil {
return errors.Wrap(err, "Could not connect to destination data store")
}
if a.config.Resume {
if a.config.Resumable() {
if err = a.applyResumability(ctx,
datastore.NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil {
datastore.NewResumableManager(a.dataStore, a.config.DataStoreConfig.Schema, archive)); err != nil {
return err
}
}
Expand All @@ -129,7 +131,10 @@ func (a *App) init(ctx context.Context) error {
}

queue := NewUploadQueue(uploadQueueCapacity, registry)
if a.exportManager, err = NewExportManager(a.config, a.ledgerBackend, queue, registry); err != nil {
if a.exportManager, err = NewExportManager(a.config.DataStoreConfig.Schema,
a.ledgerBackend, queue, registry,
a.config.StellarCoreConfig.NetworkPassphrase,
a.config.CoreVersion); err != nil {
return err
}
a.uploader = NewUploader(a.dataStore, queue, registry)
Expand All @@ -151,8 +156,8 @@ func (a *App) applyResumability(ctx context.Context, resumableManager datastore.

// TODO - evaluate a more robust validation of remote data for ledgers-per-file consistency
// this assumes ValidateAndSetLedgerRange() has conditioned the a.config.StartLedger to be at least > 1
if absentLedger > 2 && absentLedger != a.config.LedgerBatchConfig.GetSequenceNumberStartBoundary(absentLedger) {
return NewInvalidDataStoreError(absentLedger, a.config.LedgerBatchConfig.LedgersPerFile)
if absentLedger > 2 && absentLedger != a.config.DataStoreConfig.Schema.GetSequenceNumberStartBoundary(absentLedger) {
return NewInvalidDataStoreError(absentLedger, a.config.DataStoreConfig.Schema.LedgersPerFile)
}
logger.Infof("For export ledger range start=%d, end=%d, the remote storage has some of this data already, will resume at later start ledger of %d", a.config.StartLedger, a.config.EndLedger, absentLedger)
a.config.StartLedger = absentLedger
Expand Down Expand Up @@ -180,18 +185,19 @@ func newAdminServer(adminPort int, prometheusRegistry *prometheus.Registry) *htt
}
}

func (a *App) Run() {
func (a *App) Run(runtimeSettings RuntimeSettings) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := a.init(ctx); err != nil {
var dataAlreadyExported DataAlreadyExportedError
if err := a.init(ctx, runtimeSettings); err != nil {
var dataAlreadyExported *DataAlreadyExportedError
if errors.As(err, &dataAlreadyExported) {
logger.Info(err.Error())
logger.Info("Shutting down ledger-exporter")
return
return nil
}
logger.WithError(err).Fatal("Stopping ledger-exporter")
logger.WithError(err).Error("Stopping ledger-exporter")
return err
}
defer a.close()

Expand Down Expand Up @@ -254,12 +260,16 @@ func (a *App) Run() {
logger.WithError(err).Warn("error in internalServer.Shutdown")
}
}
return nil
}

// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func newLedgerBackend(config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.generateCaptiveCoreConfig()
// best effort check on a core bin available from PATH to provide as default if
// no core bin is provided from config.
coreBinFromPath, _ := exec.LookPath("stellar-core")
captiveConfig, err := config.GenerateCaptiveCoreConfig(coreBinFromPath)
if err != nil {
return nil, err
}
Expand Down
36 changes: 18 additions & 18 deletions exp/services/ledgerexporter/internal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestApplyResumeHasStartError(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true}
app.config = &Config{StartLedger: 10, EndLedger: 19, Mode: Append}
mockResumableManager := &datastore.MockResumableManager{}
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, errors.New("start error")).Once()

Expand All @@ -24,7 +24,7 @@ func TestApplyResumeHasStartError(t *testing.T) {
func TestApplyResumeDatastoreComplete(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true}
app.config = &Config{StartLedger: 10, EndLedger: 19, Mode: Append}
mockResumableManager := &datastore.MockResumableManager{}
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, nil).Once()

Expand All @@ -38,10 +38,10 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{
StartLedger: 3,
EndLedger: 9,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
StartLedger: 3,
EndLedger: 9,
Mode: Append,
DataStoreConfig: datastore.DataStoreConfig{Schema: datastore.DataStoreSchema{LedgersPerFile: 10, FilesPerPartition: 50}},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulate the datastore has inconsistent data,
Expand All @@ -58,10 +58,10 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
StartLedger: 10,
EndLedger: 99,
Mode: Append,
DataStoreConfig: datastore.DataStoreConfig{Schema: datastore.DataStoreSchema{LedgersPerFile: 10, FilesPerPartition: 50}},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50
Expand All @@ -77,10 +77,10 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
StartLedger: 10,
EndLedger: 99,
Mode: Append,
DataStoreConfig: datastore.DataStoreConfig{Schema: datastore.DataStoreSchema{LedgersPerFile: 10, FilesPerPartition: 50}},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand All @@ -99,10 +99,10 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{
StartLedger: 2,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
StartLedger: 2,
EndLedger: 99,
Mode: Append,
DataStoreConfig: datastore.DataStoreConfig{Schema: datastore.DataStoreSchema{LedgersPerFile: 10, FilesPerPartition: 50}},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand Down
Loading

0 comments on commit 4579697

Please sign in to comment.