Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/lighthorizon' into lite_search…
Browse files Browse the repository at this point in the history
…_account
  • Loading branch information
sreuland committed Jul 21, 2022
2 parents ff5b145 + 8c9eec3 commit 223626e
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 8 deletions.
4 changes: 4 additions & 0 deletions exp/lighthorizon/index/backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type FileBackend struct {
}

func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
if parallel <= 0 {
parallel = 1
}

return &FileBackend{
dir: dir,
parallel: parallel,
Expand Down
9 changes: 7 additions & 2 deletions exp/lighthorizon/index/backend/parallel_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f

wg.Add(1)
go func() {
// forces this async func to be waited on also, otherwise the outer
// method returns before this finishes.
defer wg.Done()

for account, indexes := range allIndexes {
batches <- &batch{
account: account,
Expand All @@ -43,15 +46,17 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f
defer wg.Done()
for batch := range batches {
if err := f(batch); err != nil {
log.Error(err)
log.Errorf("Error occurred writing batch: %v, retrying...", err)
time.Sleep(50 * time.Millisecond)
batches <- batch
continue
}

nwritten := atomic.AddUint64(&written, 1)
if nwritten%1000 == 0 {
log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, len(allIndexes), (float64(nwritten)/float64(len(allIndexes)))*100)
log.Infof("Writing indexes... %d/%d %.2f%%", nwritten,
len(allIndexes),
(float64(nwritten)/float64(len(allIndexes)))*100)
}

if nwritten == uint64(len(allIndexes)) {
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func BuildIndices(
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
if nprocessed%19 == 0 {
if nprocessed%97 == 0 {
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
}

Expand Down
10 changes: 5 additions & 5 deletions exp/lighthorizon/index/cmd/reduce.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Combines indices that were built separately in different folders into a single
# set of indices.
Expand All @@ -22,7 +22,7 @@ fi

if [[ ! -d "$2" ]]; then
echo "Warning: index dest ('$2') does not exist, creating..."
mkdir -p $2
mkdir -p "$2"
fi

MAP_JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l)
Expand Down Expand Up @@ -50,8 +50,8 @@ do
AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \
REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \
INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \
./reduce &
timeout -k 30s 10s ./reduce &

echo "pid=$!"
pids+=($!)
done
Expand All @@ -72,4 +72,4 @@ done

rm ./reduce # cleanup
echo "All jobs succeeded!"
exit 0
exit 0
4 changes: 4 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func Connect(backendUrl string) (Store, error) {
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
if config.Workers <= 0 {
config.Workers = 1
}

parsed, err := url.Parse(config.Url)
if err != nil {
return nil, err
Expand Down

0 comments on commit 223626e

Please sign in to comment.