From 64efc32000b80866f887bc4c210c150f0a76bfa6 Mon Sep 17 00:00:00 2001 From: urvisavla Date: Mon, 14 Oct 2024 09:56:41 -0700 Subject: [PATCH] services/horizon: Remove --parallel-job-size config parameter used for reingestion. (#5484) * Remove --parallel-job-size config parameter used for reingestion. * Introduce minimum and maximum batch sizes for parallel workers. The min batch size is set to the HistoryCheckpointLedgerInterval for both Captive Core and Buffered Storage backends. The max batch size is larger for the Captive Core backend as compared to the Buffered Storage backend. These batch sizes are estimated to optimize throughput for each ledger backend. --- .../buffered_storage_backend_test.go | 44 ++++++ ingest/ledgerbackend/ledger_buffer.go | 6 + services/horizon/CHANGELOG.md | 8 ++ services/horizon/cmd/db.go | 34 ++--- services/horizon/cmd/db_test.go | 55 +------- services/horizon/internal/ingest/main.go | 10 ++ services/horizon/internal/ingest/parallel.go | 54 +++++--- .../horizon/internal/ingest/parallel_test.go | 129 ++++++++++++++---- 8 files changed, 216 insertions(+), 124 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 0d461cff07..fe3ca0266c 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) { assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) } +func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) { + startLedger := uint32(10) + endLedger := uint32(30) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 10 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + +func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) { + startLedger := uint32(1) + endLedger := uint32(15) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 100 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + func TestBSBGetLatestLedgerSequence(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index d23bf0bfbd..7ee9dda083 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -13,6 +13,8 @@ import ( "github.com/stellar/go/support/collections/heap" "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" ) @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu less := func(a, b ledgerBatchObject) bool { return a.startLedger < b.startLedger } + // ensure BufferSize does not exceed the total range + if ledgerRange.bounded { + bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1)) + } pq := heap.New(less, int(bsb.config.BufferSize)) ledgerBuffer := &ledgerBuffer{ diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 6a8b0a708d..e3af62d580 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### Breaking Changes + +- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed. + Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing + the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to + the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484)) ## 2.32.0 diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 92a732e002..58c096a782 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -42,7 +42,6 @@ var ( dbDetectGapsCmd *cobra.Command reingestForce bool parallelWorkers uint - parallelJobSize uint32 retries uint retryBackoffSeconds uint ledgerBackendStr string @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(100000), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, { Name: "retries", ConfigKey: &retries, @@ -178,7 +169,7 @@ func ingestRangeCmdOpts() support.ConfigOptions { var dbReingestRangeCmdOpts = ingestRangeCmdOpts() var dbFillGapsCmdOpts = ingestRangeCmdOpts() -func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { +func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { var err error if reingestForce && parallelWorkers > 1 { @@ -186,9 +177,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } maxLedgersPerFlush := ingest.MaxLedgersPerFlush - if parallelJobSize < maxLedgersPerFlush { - maxLedgersPerFlush = parallelJobSize - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, @@ -214,15 +202,12 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } if parallelWorkers > 1 { - system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers) + system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize) if systemErr != nil { return systemErr } - return system.ReingestRange( - ledgerRanges, - parallelJobSize, - ) + return system.ReingestRange(ledgerRanges) } system, systemErr := ingest.NewSystem(ingestConfig) @@ -479,6 +464,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor } } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -486,12 +472,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err } - // when using buffered storage, performance observations have noted optimal parallel batch size - // of 100, apply that as default if the flag was absent. - if !viper.IsSet("parallel-job-size") { - parallelJobSize = 100 - } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -501,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, reingestForce, parallelWorkers, + ingest.MinBatchSize, + maxBatchSize, *horizonConfig, storageBackendConfig, ) @@ -541,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor withRange = true } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -549,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor return err } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -569,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor hlog.Infof("found gaps %v", gaps) } - return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig) + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig) }, } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 6a00576bd3..a2dd5f014c 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct { } func (s *DBCommandsTestSuite) SetupSuite() { - runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint, horizon.Config, ingest.StorageBackendConfig) error { return nil } @@ -45,66 +45,19 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { s.rootCmd = NewRootCmd() } -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { +func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() { s.rootCmd.SetArgs([]string{ "db", "reingest", "range", "--db-url", s.db.DSN, "--network", "testnet", "--parallel-workers", "2", + "--parallel-job-size", "10", "--ledgerbackend", "datastore", "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", "2", "10"}) - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100)) -} - -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100_000)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) + require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error()) } func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 63ee7ba457..38f9fe1d3a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string { return "" } +const ( + HistoryCheckpointLedgerInterval uint = 64 + // MinBatchSize is the minimum batch size for reingestion + MinBatchSize uint = HistoryCheckpointLedgerInterval + // MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion + MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval + // MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion + MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval +) + type StorageBackendConfig struct { DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"` diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 4f07c21cc4..a2a641c5cf 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -8,11 +8,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" -) - -const ( - historyCheckpointLedgerInterval = 64 - minBatchSize = historyCheckpointLedgerInterval + "github.com/stellar/go/support/ordered" ) type rangeError struct { @@ -27,23 +23,32 @@ func (e rangeError) Error() string { type ParallelSystems struct { config Config workerCount uint + minBatchSize uint + maxBatchSize uint systemFactory func(Config) (System, error) } -func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { +func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) { // Leaving this because used in tests, will update after a code review. - return newParallelSystems(config, workerCount, NewSystem) + return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem) } // private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { +func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { if workerCount < 1 { return nil, errors.New("workerCount must be > 0") } - + if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval { + return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval) + } + if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize { + return nil, errors.New("maxBatchSize cannot be less than minBatchSize") + } return &ParallelSystems{ config: config, workerCount: workerCount, + maxBatchSize: maxBatchSize, + minBatchSize: minBatchSize, systemFactory: systemFactory, }, nil } @@ -112,20 +117,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, } return lowestLedger } +func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 { + // calculate the initial batch size based on available workers + batchSize := rangeSize / uint32(ps.workerCount) + + // ensure the batch size meets min threshold + if ps.minBatchSize > 0 { + batchSize = ordered.Max(batchSize, uint32(ps.minBatchSize)) + } -func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { - batchSize := batchSizeSuggestion - if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { - // let's try to make use of all the workers - batchSize = rangeSize / uint32(workerCount) + // ensure the batch size does not exceed max threshold + if ps.maxBatchSize > 0 { + batchSize = ordered.Min(batchSize, uint32(ps.maxBatchSize)) } - // Use a minimum batch size to make it worth it in terms of overhead - if batchSize < minBatchSize { - batchSize = minBatchSize + + // round down to the nearest multiple of HistoryCheckpointLedgerInterval + if batchSize > uint32(HistoryCheckpointLedgerInterval) { + return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval) } - // Also, round the batch size to the closest, lower or equal 64 multiple - return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval + // HistoryCheckpointLedgerInterval is the minimum batch size. + return uint32(HistoryCheckpointLedgerInterval) } func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { @@ -136,9 +148,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { return sum } -func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error { +func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error { var ( - batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount) + batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges)) reingestJobQueue = make(chan history.LedgerRange) wg sync.WaitGroup diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 8004a4048c..f011f51021 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -1,6 +1,8 @@ package ingest import ( + "fmt" + "math" "math/rand" "sort" "sync" @@ -15,13 +17,88 @@ import ( ) func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) + config := Config{} + result := &mockSystem{} + factory := func(c Config) (System, error) { + return result, nil + } + + // worker count 0 + system, err := newParallelSystems(config, 0, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.EqualError(t, err, "workerCount must be > 0") + + // worker count 1, range smaller than HistoryCheckpointLedgerInterval + system, err = newParallelSystems(config, 1, 50, 200, factory) + assert.EqualError(t, err, fmt.Sprintf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)) + + // worker count 1, max batch size smaller than min batch size + system, err = newParallelSystems(config, 1, 5000, 200, factory) + assert.EqualError(t, err, "maxBatchSize cannot be less than minBatchSize") + + // worker count 1, captive core batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.Equal(t, uint32(MaxCaptiveCoreBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxCaptiveCoreBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, buffered storage batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxBufferedStorageBackendBatchSize, factory) + assert.Equal(t, uint32(MaxBufferedStorageBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxBufferedStorageBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, no min/max batch size + system, _ = newParallelSystems(config, 1, 0, 0, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(20032), system.calculateParallelLedgerBatchSize(20090)) // round down + + // worker count 1, min/max batch size + system, _ = newParallelSystems(config, 1, 64, 20000, factory) + assert.Equal(t, uint32(19968), system.calculateParallelLedgerBatchSize(20096)) // round down + system, _ = newParallelSystems(config, 1, 64, 30000, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + + // Tests for worker count 2 + + // no min/max batch size + system, _ = newParallelSystems(config, 2, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(60)) // range smaller than 64 + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(128)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) + + // range larger than max batch size + system, _ = newParallelSystems(config, 2, 64, 10000, factory) + assert.Equal(t, uint32(9984), system.calculateParallelLedgerBatchSize(20096)) // round down + + // range smaller than min batch size + system, _ = newParallelSystems(config, 2, 64, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(50)) // min batch size + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // min batch size + + // batch size equal to min + system, _ = newParallelSystems(config, 2, 100, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // round down + + // equal min/max batch size + system, _ = newParallelSystems(config, 2, 5000, 5000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) // round down + + // worker count 3 + system, _ = newParallelSystems(config, 3, 64, 7000, factory) + assert.Equal(t, uint32(6656), system.calculateParallelLedgerBatchSize(20096)) + + // worker count 4 + system, _ = newParallelSystems(config, 4, 64, 20000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) //round down + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(64)) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(2)) + + // max possible workers + system, _ = newParallelSystems(config, math.MaxUint32, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(math.MaxUint32)) } func TestParallelReingestRange(t *testing.T) { @@ -43,31 +120,27 @@ func TestParallelReingestRange(t *testing.T) { factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) assert.NoError(t, err) sort.Slice(rangesCalled, func(i, j int) bool { return rangesCalled[i].StartSequence < rangesCalled[j].StartSequence }) expected := []history.LedgerRange{ - {StartSequence: 1, EndSequence: 256}, {StartSequence: 257, EndSequence: 512}, {StartSequence: 513, EndSequence: 768}, {StartSequence: 769, EndSequence: 1024}, {StartSequence: 1025, EndSequence: 1280}, - {StartSequence: 1281, EndSequence: 1536}, {StartSequence: 1537, EndSequence: 1792}, {StartSequence: 1793, EndSequence: 2048}, {StartSequence: 2049, EndSequence: 2050}, + {StartSequence: 1, EndSequence: 640}, {StartSequence: 641, EndSequence: 1280}, {StartSequence: 1281, EndSequence: 1920}, {StartSequence: 1921, EndSequence: 2050}, } assert.Equal(t, expected, rangesCalled) rangesCalled = nil - system, err = newParallelSystems(config, 1, factory) + system, err = newParallelSystems(config, 1, 0, 0, factory) assert.NoError(t, err) result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() - err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64) + err = system.ReingestRange([]history.LedgerRange{{1, 1024}}) result.AssertExpectations(t) expected = []history.LedgerRange{ - {StartSequence: 1, EndSequence: 64}, {StartSequence: 65, EndSequence: 128}, {StartSequence: 129, EndSequence: 192}, {StartSequence: 193, EndSequence: 256}, {StartSequence: 257, EndSequence: 320}, - {StartSequence: 321, EndSequence: 384}, {StartSequence: 385, EndSequence: 448}, {StartSequence: 449, EndSequence: 512}, {StartSequence: 513, EndSequence: 576}, {StartSequence: 577, EndSequence: 640}, - {StartSequence: 641, EndSequence: 704}, {StartSequence: 705, EndSequence: 768}, {StartSequence: 769, EndSequence: 832}, {StartSequence: 833, EndSequence: 896}, {StartSequence: 897, EndSequence: 960}, - {StartSequence: 961, EndSequence: 1024}, + {StartSequence: 1, EndSequence: 1024}, } assert.NoError(t, err) assert.Equal(t, expected, rangesCalled) @@ -77,19 +150,19 @@ func TestParallelReingestRangeError(t *testing.T) { config := Config{} result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Return(errors.New("failed because of foo")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1537, 2050]: error when processing [1537, 1792] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) } func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { @@ -98,27 +171,27 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) result := &mockSystem{} // Fail on an lower subrange after the first error - result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) }).Return(errors.New("failed because of foo")).Once() - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{1281, 1920}}, false, false).Run(func(mock.Arguments) { wg.Done() }).Return(errors.New("failed because of bar")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil)) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, 0, 0, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1025, 2050]: error when processing [1025, 1280] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) }