From 91c62f58b2357bcb2b70f3cc69c00d93aef76723 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 20 Jul 2022 14:56:43 -0700 Subject: [PATCH 1/3] "Fix" the data race by not doing any flushes until the end --- exp/lighthorizon/index/builder.go | 7 +------ exp/lighthorizon/index/connect.go | 3 +-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index 3effef1e83..8ae1181709 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -127,14 +127,9 @@ func BuildIndices( } nprocessed := atomic.AddUint64(&processed, uint64(count)) - if nprocessed%19 == 0 { + if nprocessed%97 == 0 { printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) } - - // Upload indices once per checkpoint to save memory - if err := indexStore.Flush(); err != nil { - return errors.Wrap(err, "flushing indices failed") - } } return nil }) diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 4f8abc368c..8c6d90173e 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -4,7 +4,6 @@ import ( "fmt" "net/url" "path/filepath" - "runtime" "github.com/aws/aws-sdk-go/aws" @@ -17,7 +16,7 @@ func Connect(backendUrl string) (Store, error) { func ConnectWithConfig(config StoreConfig) (Store, error) { if config.Workers <= 0 { - config.Workers = uint32(runtime.NumCPU()) - 1 + config.Workers = 1 } parsed, err := url.Parse(config.Url) From 1309a2b50b53bfe2b34dd3f700354ff7463824f9 Mon Sep 17 00:00:00 2001 From: shawn Date: Wed, 20 Jul 2022 17:16:21 -0700 Subject: [PATCH 2/3] Create parallel_flush.go `go test -v -race -run TestSingleProcess ./...` passes after this --- exp/lighthorizon/index/backend/parallel_flush.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/exp/lighthorizon/index/backend/parallel_flush.go b/exp/lighthorizon/index/backend/parallel_flush.go index 4f9b14e783..80a14fc9b8 100644 --- a/exp/lighthorizon/index/backend/parallel_flush.go +++ b/exp/lighthorizon/index/backend/parallel_flush.go @@ -21,7 +21,10 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f batches := make(chan *batch, parallel) + wg.Add(1) go func() { + // forces this async func to be waited on also, otherwise the outer method returns before this finishes. + defer wg.Done() for account, indexes := range allIndexes { batches <- &batch{ account: account, From 2c330877be0b94b6f83bfde97a76829cdeba34d0 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 21 Jul 2022 10:03:58 -0700 Subject: [PATCH 3/3] Add synchronization for the work submission routine. Thank you @sreuland! --- exp/lighthorizon/index/backend/parallel_flush.go | 8 ++++++-- exp/lighthorizon/index/builder.go | 5 +++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/exp/lighthorizon/index/backend/parallel_flush.go b/exp/lighthorizon/index/backend/parallel_flush.go index 4f9b14e783..ec8a199b43 100644 --- a/exp/lighthorizon/index/backend/parallel_flush.go +++ b/exp/lighthorizon/index/backend/parallel_flush.go @@ -21,7 +21,9 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f batches := make(chan *batch, parallel) + wg.Add(1) go func() { + defer wg.Done() for account, indexes := range allIndexes { batches <- &batch{ account: account, @@ -41,7 +43,7 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f defer wg.Done() for batch := range batches { if err := f(batch); err != nil { - log.Error(err) + log.Errorf("Error occurred writing batch: %v, retrying...", err) time.Sleep(50 * time.Millisecond) batches <- batch continue @@ -49,7 +51,9 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f nwritten := atomic.AddUint64(&written, 1) if nwritten%1000 == 0 { - log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, len(allIndexes), (float64(nwritten)/float64(len(allIndexes)))*100) + log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, + len(allIndexes), + (float64(nwritten)/float64(len(allIndexes)))*100) } if nwritten == uint64(len(allIndexes)) { diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index 8ae1181709..58f5da8f89 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -130,6 +130,11 @@ func BuildIndices( if nprocessed%97 == 0 { printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) } + + // Upload indices once per checkpoint to save memory + if err := indexStore.Flush(); err != nil { + return errors.Wrap(err, "flushing indices failed") + } } return nil })