From 033de79752d64a47662d4c0d8d8ab1670319e6a9 Mon Sep 17 00:00:00 2001 From: George Date: Wed, 20 Jul 2022 12:44:51 -0700 Subject: [PATCH 1/2] exp/lighthorizon: Set a default number of workers. (#4465) * Default to the number of CPUs if worker count isn't specified * Set a timeout on the reduce job to avoid test suite hanging indefinitely --- exp/lighthorizon/index/backend/file.go | 4 ++++ exp/lighthorizon/index/cmd/reduce.sh | 10 +++++----- exp/lighthorizon/index/connect.go | 5 +++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/exp/lighthorizon/index/backend/file.go b/exp/lighthorizon/index/backend/file.go index dc7af997f2..2e25e1c1bc 100644 --- a/exp/lighthorizon/index/backend/file.go +++ b/exp/lighthorizon/index/backend/file.go @@ -20,6 +20,10 @@ type FileBackend struct { } func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) { + if parallel <= 0 { + parallel = 1 + } + return &FileBackend{ dir: dir, parallel: parallel, diff --git a/exp/lighthorizon/index/cmd/reduce.sh b/exp/lighthorizon/index/cmd/reduce.sh index 4d681b63f9..e89b91c474 100755 --- a/exp/lighthorizon/index/cmd/reduce.sh +++ b/exp/lighthorizon/index/cmd/reduce.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash # # Combines indices that were built separately in different folders into a single # set of indices. @@ -22,7 +22,7 @@ fi if [[ ! -d "$2" ]]; then echo "Warning: index dest ('$2') does not exist, creating..." - mkdir -p $2 + mkdir -p "$2" fi MAP_JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l) @@ -50,8 +50,8 @@ do AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \ REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \ INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \ - ./reduce & - + timeout -k 30s 10s ./reduce & + echo "pid=$!" pids+=($!) done @@ -72,4 +72,4 @@ done rm ./reduce # cleanup echo "All jobs succeeded!" -exit 0 \ No newline at end of file +exit 0 diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index f6ac6c64f0..4f8abc368c 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -4,6 +4,7 @@ import ( "fmt" "net/url" "path/filepath" + "runtime" "github.com/aws/aws-sdk-go/aws" @@ -15,6 +16,10 @@ func Connect(backendUrl string) (Store, error) { } func ConnectWithConfig(config StoreConfig) (Store, error) { + if config.Workers <= 0 { + config.Workers = uint32(runtime.NumCPU()) - 1 + } + parsed, err := url.Parse(config.Url) if err != nil { return nil, err From 8c9eec3d7efa4d97d8e7fb9b4d2edbba586290fd Mon Sep 17 00:00:00 2001 From: George Date: Thu, 21 Jul 2022 14:05:11 -0700 Subject: [PATCH 2/2] exp/lighthorizon: Fix the single-process index builder data race. (#4470) * Add synchronization for the work submission routine. Thank you @sreuland! Co-authored-by: shawn --- exp/lighthorizon/index/backend/parallel_flush.go | 11 +++++++++-- exp/lighthorizon/index/builder.go | 2 +- exp/lighthorizon/index/connect.go | 3 +-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/exp/lighthorizon/index/backend/parallel_flush.go b/exp/lighthorizon/index/backend/parallel_flush.go index 4f9b14e783..896155c98c 100644 --- a/exp/lighthorizon/index/backend/parallel_flush.go +++ b/exp/lighthorizon/index/backend/parallel_flush.go @@ -21,7 +21,12 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f batches := make(chan *batch, parallel) + wg.Add(1) go func() { + // forces this async func to be waited on also, otherwise the outer + // method returns before this finishes. + defer wg.Done() + for account, indexes := range allIndexes { batches <- &batch{ account: account, @@ -41,7 +46,7 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f defer wg.Done() for batch := range batches { if err := f(batch); err != nil { - log.Error(err) + log.Errorf("Error occurred writing batch: %v, retrying...", err) time.Sleep(50 * time.Millisecond) batches <- batch continue @@ -49,7 +54,9 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f nwritten := atomic.AddUint64(&written, 1) if nwritten%1000 == 0 { - log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, len(allIndexes), (float64(nwritten)/float64(len(allIndexes)))*100) + log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, + len(allIndexes), + (float64(nwritten)/float64(len(allIndexes)))*100) } if nwritten == uint64(len(allIndexes)) { diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index 3effef1e83..58f5da8f89 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -127,7 +127,7 @@ func BuildIndices( } nprocessed := atomic.AddUint64(&processed, uint64(count)) - if nprocessed%19 == 0 { + if nprocessed%97 == 0 { printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) } diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 4f8abc368c..8c6d90173e 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -4,7 +4,6 @@ import ( "fmt" "net/url" "path/filepath" - "runtime" "github.com/aws/aws-sdk-go/aws" @@ -17,7 +16,7 @@ func Connect(backendUrl string) (Store, error) { func ConnectWithConfig(config StoreConfig) (Store, error) { if config.Workers <= 0 { - config.Workers = uint32(runtime.NumCPU()) - 1 + config.Workers = 1 } parsed, err := url.Parse(config.Url)