diff --git a/exp/lighthorizon/index/backend/parallel_flush.go b/exp/lighthorizon/index/backend/parallel_flush.go index 4f9b14e783..896155c98c 100644 --- a/exp/lighthorizon/index/backend/parallel_flush.go +++ b/exp/lighthorizon/index/backend/parallel_flush.go @@ -21,7 +21,12 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f batches := make(chan *batch, parallel) + wg.Add(1) go func() { + // forces this async func to be waited on also, otherwise the outer + // method returns before this finishes. + defer wg.Done() + for account, indexes := range allIndexes { batches <- &batch{ account: account, @@ -41,7 +46,7 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f defer wg.Done() for batch := range batches { if err := f(batch); err != nil { - log.Error(err) + log.Errorf("Error occurred writing batch: %v, retrying...", err) time.Sleep(50 * time.Millisecond) batches <- batch continue @@ -49,7 +54,9 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f nwritten := atomic.AddUint64(&written, 1) if nwritten%1000 == 0 { - log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, len(allIndexes), (float64(nwritten)/float64(len(allIndexes)))*100) + log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, + len(allIndexes), + (float64(nwritten)/float64(len(allIndexes)))*100) } if nwritten == uint64(len(allIndexes)) { diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index 3effef1e83..58f5da8f89 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -127,7 +127,7 @@ func BuildIndices( } nprocessed := atomic.AddUint64(&processed, uint64(count)) - if nprocessed%19 == 0 { + if nprocessed%97 == 0 { printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) } diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 4f8abc368c..8c6d90173e 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -4,7 +4,6 @@ import ( "fmt" "net/url" "path/filepath" - "runtime" "github.com/aws/aws-sdk-go/aws" @@ -17,7 +16,7 @@ func Connect(backendUrl string) (Store, error) { func ConnectWithConfig(config StoreConfig) (Store, error) { if config.Workers <= 0 { - config.Workers = uint32(runtime.NumCPU()) - 1 + config.Workers = 1 } parsed, err := url.Parse(config.Url)