Skip to content

Commit

Permalink
/exp/services/ledgerexporter: resumable export, check data storage fo…
Browse files Browse the repository at this point in the history
…r optimal starting point (stellar#5264)
  • Loading branch information
sreuland authored Apr 30, 2024
1 parent 5f50829 commit 9808f37
Show file tree
Hide file tree
Showing 32 changed files with 1,350 additions and 489 deletions.
19 changes: 11 additions & 8 deletions exp/services/ledgerexporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ $(if $(STELLAR_CORE_VERSION), --build-arg STELLAR_CORE_VERSION=$(STELLAR_CORE_VE
-t $(DOCKER_IMAGE):$(VERSION) \
-t $(DOCKER_IMAGE):latest .

docker-test:
docker-clean:
$(SUDO) docker stop fake-gcs-server || true
$(SUDO) docker rm fake-gcs-server || true
$(SUDO) rm -rf ${PWD}/storage || true
$(SUDO) docker network rm test-network || true

docker-test: docker-clean
# Create temp storage dir
$(SUDO) mkdir -p ${PWD}/storage/exporter-test
$(SUDO) mkdir -p ${PWD}/storage/exporter-test

# Create test network for docker
$(SUDO) docker network create test-network
Expand All @@ -28,16 +34,13 @@ docker-test:
# Run the ledger-exporter
$(SUDO) docker run --platform linux/amd64 -t --network test-network\
-e NETWORK=pubnet \
-e ARCHIVE_TARGET=gcs://exporter-test \
-e ARCHIVE_TARGET=exporter-test/test-subpath \
-e START=1000 \
-e END=2000 \
-e STORAGE_EMULATOR_HOST=http://fake-gcs-server:4443 \
$(DOCKER_IMAGE):$(VERSION)

$(SUDO) docker stop fake-gcs-server
$(SUDO) docker rm fake-gcs-server
$(SUDO) rm -rf ${PWD}/storage
$(SUDO) docker network rm test-network

$(MAKE) docker-clean

docker-push:
$(SUDO) docker push $(DOCKER_IMAGE):$(VERSION)
Expand Down
17 changes: 12 additions & 5 deletions exp/services/ledgerexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@ Exports ledgers continuously starting from --start. In this mode, the end ledger
ledgerexporter --start <start_ledger> --config-file <config_file_path>
```

#### Resumability:
Exporting a ledger range can be optimized further by enabling resumability if the remote data store supports it.

Starts exporting from a specified number of ledgers before the latest ledger sequence number on the network.
```bash
ledgerexporter --from-last <number_of_ledgers> --config-file <config_file_path>
```
By default, resumability is disabled, `--resume false`

When enabled, `--resume true`, ledgerexporter will search the remote data store within the requested range, looking for the oldest absent ledger sequence number within range. If abscence is detected, the export range is narrowed to `--start <absent_ledger_sequence>`.
This feature requires all ledgers to be present on the remote data store for some (possibly empty) prefix of the requested range and then absent for the (possibly empty) remainder.

### Configuration (toml):

```toml
network = "testnet" # Options: `testnet` or `pubnet`
destination_url = "gcs://your-bucket-name"

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "your-bucket-name/<optional_subpaths>"

[exporter_config]
ledgers_per_file = 64
Expand Down
10 changes: 9 additions & 1 deletion exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
network = "testnet"
destination_url = "gcs://exporter-test/ledgers"

[datastore_config]
type = "GCS"

[datastore_config.params]
destination_bucket_path = "exporter-test/ledgers"

[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000

[stellar_core_config]
stellar_core_binary_path = "/usr/local/bin/stellar-core"

11 changes: 6 additions & 5 deletions exp/services/ledgerexporter/docker/start
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ files_per_partition="${FILES_PER_PARTITION:-64000}"
# Generate TOML configuration
cat <<EOF > config.toml
network = "${NETWORK}"
destination_url = "${ARCHIVE_TARGET}"
[datastore_config]
type = "GCS"
[datastore_config.params]
destination_bucket_path = "${ARCHIVE_TARGET}"
[exporter_config]
ledgers_per_file = $ledgers_per_file
Expand All @@ -29,10 +34,6 @@ EOF
if [[ -n "$START" || -n "$END" ]]; then
echo "START: $START END: $END"
/usr/bin/ledgerexporter --config-file config.toml --start $START --end $END
# Check if FROM_LAST variable is set
elif [[ -n "$FROM_LAST" ]]; then
echo "FROM_LAST: $FROM_LAST"
/usr/bin/ledgerexporter --config-file config.toml --from-last $FROM_LAST
else
echo "Error: No ledger range provided."
exit 1
Expand Down
155 changes: 117 additions & 38 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stellar/go/historyarchive"

"github.com/stellar/go/ingest/ledgerbackend"
_ "github.com/stellar/go/network"
Expand All @@ -24,41 +25,118 @@ var (
logger = log.New().WithField("service", "ledger-exporter")
)

func NewDataAlreadyExportedError(Start uint32, End uint32) *DataAlreadyExportedError {
return &DataAlreadyExportedError{
Start: Start,
End: End,
}
}

type DataAlreadyExportedError struct {
Start uint32
End uint32
}

func (m DataAlreadyExportedError) Error() string {
return fmt.Sprintf("For export ledger range start=%d, end=%d, the remote storage has all the data, there is no need to continue export", m.Start, m.End)
}

func NewInvalidDataStoreError(LedgerSequence uint32, LedgersPerFile uint32) *InvalidDataStoreError {
return &InvalidDataStoreError{
LedgerSequence: LedgerSequence,
LedgersPerFile: LedgersPerFile,
}
}

type InvalidDataStoreError struct {
LedgerSequence uint32
LedgersPerFile uint32
}

func (m InvalidDataStoreError) Error() string {
return fmt.Sprintf("The remote data store has inconsistent data, "+
"a resumable starting ledger of %v was identified, "+
"but that is not aligned to expected ledgers-per-file of %v. use '--resume false' to bypass",
m.LedgerSequence, m.LedgersPerFile)
}

type App struct {
config Config
config *Config
ledgerBackend ledgerbackend.LedgerBackend
dataStore DataStore
exportManager *ExportManager
uploader Uploader
flags Flags
prometheusRegistry *prometheus.Registry
}

func NewApp() *App {
func NewApp(flags Flags) *App {
logger.SetLevel(log.DebugLevel)

config := Config{}
err := config.LoadConfig()
logFatalIf(err, "Could not load configuration")

app := &App{config: config, prometheusRegistry: prometheus.NewRegistry()}
app.prometheusRegistry.MustRegister(
registry := prometheus.NewRegistry()
registry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{Namespace: "ledger_exporter"}),
collectors.NewGoCollector(),
)
app := &App{flags: flags, prometheusRegistry: registry}
return app
}

func (a *App) init(ctx context.Context) {
a.dataStore = mustNewDataStore(ctx, a.config)
a.ledgerBackend = mustNewLedgerBackend(ctx, a.config, a.prometheusRegistry)
func (a *App) init(ctx context.Context) error {
var err error
var archive historyarchive.ArchiveInterface

if a.config, err = NewConfig(ctx, a.flags); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = createHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
return err
}
a.config.ValidateAndSetLedgerRange(ctx, archive)

if a.dataStore, err = NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil {
return errors.Wrap(err, "Could not connect to destination data store")
}
if a.config.Resume {
if err = a.applyResumability(ctx,
NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil {
return err
}
}

logger.Infof("Final computed ledger range for backend retrieval and export, start=%d, end=%d", a.config.StartLedger, a.config.EndLedger)

if a.ledgerBackend, err = newLedgerBackend(ctx, a.config, a.prometheusRegistry); err != nil {
return err
}

// TODO: make number of upload workers configurable instead of hard coding it to 1
queue := NewUploadQueue(1, a.prometheusRegistry)
a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend, queue, a.prometheusRegistry)
a.uploader = NewUploader(
a.dataStore,
queue,
a.prometheusRegistry,
)
if a.exportManager, err = NewExportManager(a.config.LedgerBatchConfig, a.ledgerBackend, queue, a.prometheusRegistry); err != nil {
return err
}
a.uploader = NewUploader(a.dataStore, queue, a.prometheusRegistry)

return nil
}

func (a *App) applyResumability(ctx context.Context, resumableManager ResumableManager) error {
absentLedger, ok, err := resumableManager.FindStart(ctx, a.config.StartLedger, a.config.EndLedger)
if err != nil {
return err
}
if !ok {
return NewDataAlreadyExportedError(a.config.StartLedger, a.config.EndLedger)
}

// TODO - evaluate a more robust validation of remote data for ledgers-per-file consistency
// this assumes ValidateAndSetLedgerRange() has conditioned the a.config.StartLedger to be at least > 1
if absentLedger > 2 && absentLedger != a.config.LedgerBatchConfig.GetSequenceNumberStartBoundary(absentLedger) {
return NewInvalidDataStoreError(absentLedger, a.config.LedgerBatchConfig.LedgersPerFile)
}
logger.Infof("For export ledger range start=%d, end=%d, the remote storage has some of this data already, will resume at later start ledger of %d", a.config.StartLedger, a.config.EndLedger, absentLedger)
a.config.StartLedger = absentLedger

return nil
}

func (a *App) close() {
Expand Down Expand Up @@ -92,7 +170,15 @@ func (a *App) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a.init(ctx)
if err := a.init(ctx); err != nil {
var dataAlreadyExported DataAlreadyExportedError
if errors.As(err, &dataAlreadyExported) {
logger.Info(err.Error())
logger.Info("Shutting down ledger-exporter")
return
}
logger.WithError(err).Fatal("Stopping ledger-exporter")
}
defer a.close()

var wg sync.WaitGroup
Expand Down Expand Up @@ -133,22 +219,20 @@ func (a *App) Run() {
logger.Info("Shutting down ledger-exporter")
}

func mustNewDataStore(ctx context.Context, config Config) DataStore {
dataStore, err := NewDataStore(ctx, fmt.Sprintf("%s/%s", config.DestinationURL, config.Network))
logFatalIf(err, "Could not connect to destination data store")
return dataStore
}

// mustNewLedgerBackend Creates and initializes captive core ledger backend
// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry *prometheus.Registry) ledgerbackend.LedgerBackend {
captiveConfig := config.GenerateCaptiveCoreConfig()
func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.GenerateCaptiveCoreConfig()
if err != nil {
return nil, err
}

var backend ledgerbackend.LedgerBackend
var err error
// Create a new captive core backend
backend, err = ledgerbackend.NewCaptive(captiveConfig)
logFatalIf(err, "Failed to create captive-core instance")
if err != nil {
return nil, errors.Wrap(err, "Failed to create captive-core instance")
}
backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter")

var ledgerRange ledgerbackend.Range
Expand All @@ -158,13 +242,8 @@ func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry
ledgerRange = ledgerbackend.BoundedRange(config.StartLedger, config.EndLedger)
}

err = backend.PrepareRange(ctx, ledgerRange)
logFatalIf(err, "Could not prepare captive core ledger backend")
return backend
}

func logFatalIf(err error, message string, args ...interface{}) {
if err != nil {
logger.WithError(err).Fatalf(message, args...)
if err = backend.PrepareRange(ctx, ledgerRange); err != nil {
return nil, errors.Wrap(err, "Could not prepare captive core ledger backend")
}
return backend, nil
}
Loading

0 comments on commit 9808f37

Please sign in to comment.