From 2b62f1453a38549af1c127b425e6cfacacd7f104 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 29 Jun 2022 16:57:47 -0500 Subject: [PATCH 1/6] Add reduce test to ensure combining map jobs works --- .../cmd/{map_test.go => mapreduce_test.go} | 41 ++++++++++++ exp/lighthorizon/index/cmd/reduce.sh | 64 +++++++++++++++++++ 2 files changed, 105 insertions(+) rename exp/lighthorizon/index/cmd/{map_test.go => mapreduce_test.go} (78%) create mode 100755 exp/lighthorizon/index/cmd/reduce.sh diff --git a/exp/lighthorizon/index/cmd/map_test.go b/exp/lighthorizon/index/cmd/mapreduce_test.go similarity index 78% rename from exp/lighthorizon/index/cmd/map_test.go rename to exp/lighthorizon/index/cmd/mapreduce_test.go index 4c352425e3..a4c2419cbb 100644 --- a/exp/lighthorizon/index/cmd/map_test.go +++ b/exp/lighthorizon/index/cmd/mapreduce_test.go @@ -22,6 +22,45 @@ const ( ) func TestMap(t *testing.T) { + RunMapTest(t) +} + +func TestReduce(t *testing.T) { + // First, map the index files like we normally would. + startLedger, endLedger, jobRoot := RunMapTest(t) + batchCount := (endLedger - startLedger + batchSize) / batchSize // ceil(ledgerCount / batchSize) + + // Now that indices have been "map"ped, reduce them to a single store. + + indexTarget := filepath.Join(t.TempDir(), "final-indices") + reduceTestCmd := exec.Command("./reduce.sh", jobRoot, indexTarget) + t.Logf("Running %d reduce jobs: %s", batchCount, reduceTestCmd.String()) + stdout, err := reduceTestCmd.CombinedOutput() + t.Logf(string(stdout)) + require.NoError(t, err) + + // Then, build the *same* indices using the single-process tester. + + t.Logf("Building baseline for ledger range [%d, %d]", startLedger, endLedger) + hashes, participants := IndexLedgerRange(t, txmetaSource, startLedger, endLedger) + require.NotNil(t, hashes) + require.NotNil(t, participants) + + // Finally, compare the two to make sure the reduce job did what it's + // supposed to do. + + indexStore, err := index.Connect("file://" + indexTarget) + require.NoError(t, err) + require.NotNil(t, indexStore) + stores := []index.Store{indexStore} // to reuse code: same as array of 1 store + + assertParticipantsEqual(t, keysU32(participants), stores) + for account, checkpoints := range participants { + assertParticipantCheckpointsEqual(t, account, checkpoints, stores) + } +} + +func RunMapTest(t *testing.T) (uint32, uint32, string) { // Only file:// style URLs for the txmeta source are allowed while testing. parsed, err := url.Parse(txmetaSource) require.NoErrorf(t, err, "%s is not a valid URL", txmetaSource) @@ -103,6 +142,8 @@ func TestMap(t *testing.T) { for account, checkpoints := range participants { assertParticipantCheckpointsEqual(t, account, checkpoints, stores) } + + return startLedger, endLedger, tempDir } func assertParticipantsEqual(t *testing.T, diff --git a/exp/lighthorizon/index/cmd/reduce.sh b/exp/lighthorizon/index/cmd/reduce.sh new file mode 100755 index 0000000000..a0e4e882ad --- /dev/null +++ b/exp/lighthorizon/index/cmd/reduce.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# check parameters and their validity (types, existence, etc.) + +if [[ "$#" -ne "2" ]]; then + echo "Usage: $0 " + exit 1 +fi + +if [[ ! -d "$1" ]]; then + echo "Error: index src root ('$1') does not exist" + echo "Usage: $0 " + exit 1 +fi + +if [[ ! -d "$2" ]]; then + echo "Warning: index dest ('$2') does not exist, creating..." + mkdir -p $2 +fi + + +go build -o reduce ./batch/reduce/... +if [[ "$?" -ne "0" ]]; then + echo "Build failed" + exit 1 +fi + +JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l) +if [[ "$JOB_COUNT" -le "0" ]]; then + echo "No jobs in index src root ('$1') found." + exit 1 +fi + +pids=( ) +for (( i=0; i < $JOB_COUNT; i++ )) +do + echo -n "Creating job $i... " + + AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$JOB_COUNT \ + REDUCE_JOB_COUNT=$JOB_COUNT WORKER_COUNT=4 \ + INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \ + ./reduce & + + echo "pid=$!" + pids+=($!) +done + +sleep $JOB_COUNT + +# Check the status codes for all of the map processes. +for i in "${!pids[@]}"; do + pid=${pids[$i]} + echo -n "Checking job $i (pid=$pid)... " + if ! wait "$pid"; then + echo "failed" + exit 1 + else + echo "succeeded!" + fi +done + +rm ./reduce +echo "All jobs succeeded!" +exit 0 \ No newline at end of file From e01d1c6d92f02dffd567b4bc6c31409e7d625e53 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 29 Jun 2022 17:57:54 -0500 Subject: [PATCH 2/6] Various fixups, plus actually test that TOIDs are correct --- exp/lighthorizon/index/cmd/map.sh | 2 +- exp/lighthorizon/index/cmd/mapreduce_test.go | 35 +++++++++++++++++--- exp/lighthorizon/index/cmd/reduce.sh | 18 +++++----- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/exp/lighthorizon/index/cmd/map.sh b/exp/lighthorizon/index/cmd/map.sh index 9d7d6bfb25..4f7db034fe 100755 --- a/exp/lighthorizon/index/cmd/map.sh +++ b/exp/lighthorizon/index/cmd/map.sh @@ -66,7 +66,7 @@ fi pids=( ) for (( i=0; i < $BATCH_COUNT; i++ )) do - echo -n "Creating job $i... " + echo -n "Creating map job $i... " AWS_BATCH_JOB_ARRAY_INDEX=$i BATCH_SIZE=$BATCH_SIZE FIRST_CHECKPOINT=$FIRST \ TXMETA_SOURCE=file://$1 INDEX_TARGET=file://$2 WORKER_COUNT=1 \ diff --git a/exp/lighthorizon/index/cmd/mapreduce_test.go b/exp/lighthorizon/index/cmd/mapreduce_test.go index a4c2419cbb..17287d1fd8 100644 --- a/exp/lighthorizon/index/cmd/mapreduce_test.go +++ b/exp/lighthorizon/index/cmd/mapreduce_test.go @@ -1,6 +1,7 @@ package main_test import ( + "encoding/hex" "fmt" "io" "net/url" @@ -43,21 +44,20 @@ func TestReduce(t *testing.T) { t.Logf("Building baseline for ledger range [%d, %d]", startLedger, endLedger) hashes, participants := IndexLedgerRange(t, txmetaSource, startLedger, endLedger) - require.NotNil(t, hashes) - require.NotNil(t, participants) // Finally, compare the two to make sure the reduce job did what it's // supposed to do. indexStore, err := index.Connect("file://" + indexTarget) require.NoError(t, err) - require.NotNil(t, indexStore) stores := []index.Store{indexStore} // to reuse code: same as array of 1 store assertParticipantsEqual(t, keysU32(participants), stores) for account, checkpoints := range participants { assertParticipantCheckpointsEqual(t, account, checkpoints, stores) } + + assertTOIDsEqual(t, hashes, stores) } func RunMapTest(t *testing.T) (uint32, uint32, string) { @@ -117,8 +117,6 @@ func RunMapTest(t *testing.T) (uint32, uint32, string) { // Then, build the *same* indices using the single-process tester. t.Logf("Building baseline for ledger range [%d, %d]", startLedger, endLedger) hashes, participants := IndexLedgerRange(t, txmetaSource, startLedger, endLedger) - require.NotNil(t, hashes) - require.NotNil(t, participants) // Now, walk through the mapped indices and ensure that at least one of the // jobs reported the same indices for tx TOIDs and participation. @@ -143,6 +141,8 @@ func RunMapTest(t *testing.T) (uint32, uint32, string) { assertParticipantCheckpointsEqual(t, account, checkpoints, stores) } + assertTOIDsEqual(t, hashes, stores) + return startLedger, endLedger, tempDir } @@ -205,6 +205,31 @@ func assertParticipantCheckpointsEqual(t *testing.T, } } +func assertTOIDsEqual(t *testing.T, toids map[string]int64, stores []index.Store) { + for hash, toid := range toids { + rawHash := [32]byte{} + decodedHash, err := hex.DecodeString(hash) + require.NoError(t, err) + require.Lenf(t, decodedHash, 32, "invalid tx hash length") + copy(rawHash[:], decodedHash) + + found := false + for i, store := range stores { + storeToid, err := store.TransactionTOID(rawHash) + if err != nil { + require.ErrorIsf(t, err, io.EOF, + "only EOF errors are allowed (store %d, hash %s)", i, hash) + } else { + require.Equalf(t, toid, storeToid, + "TOIDs for tx 0x%s don't match (store %d)", hash, i) + found = true + } + } + + require.Truef(t, found, "TOID for tx 0x%s not found in stores", hash) + } +} + func keysU32(dict map[string][]uint32) []string { result := make([]string, 0, len(dict)) for key := range dict { diff --git a/exp/lighthorizon/index/cmd/reduce.sh b/exp/lighthorizon/index/cmd/reduce.sh index a0e4e882ad..fb85a6f677 100755 --- a/exp/lighthorizon/index/cmd/reduce.sh +++ b/exp/lighthorizon/index/cmd/reduce.sh @@ -18,26 +18,28 @@ if [[ ! -d "$2" ]]; then mkdir -p $2 fi - go build -o reduce ./batch/reduce/... if [[ "$?" -ne "0" ]]; then echo "Build failed" exit 1 fi -JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l) -if [[ "$JOB_COUNT" -le "0" ]]; then +MAP_JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l) +if [[ "$MAP_JOB_COUNT" -le "0" ]]; then echo "No jobs in index src root ('$1') found." exit 1 fi +REDUCE_JOB_COUNT=$MAP_JOB_COUNT + +echo "Coalescing $MAP_JOB_COUNT discovered job outputs from $1 into $2..." pids=( ) -for (( i=0; i < $JOB_COUNT; i++ )) +for (( i=0; i < $REDUCE_JOB_COUNT; i++ )) do - echo -n "Creating job $i... " + echo -n "Creating reduce job $i... " - AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$JOB_COUNT \ - REDUCE_JOB_COUNT=$JOB_COUNT WORKER_COUNT=4 \ + AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \ + REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \ INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \ ./reduce & @@ -45,7 +47,7 @@ do pids+=($!) done -sleep $JOB_COUNT +sleep $REDUCE_JOB_COUNT # Check the status codes for all of the map processes. for i in "${!pids[@]}"; do From ec779b0328132ec5b7720d6ebea3caa3b119c038 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 29 Jun 2022 18:02:19 -0500 Subject: [PATCH 3/6] Bugfix: Transaction prefix loop should be inclusive --- exp/lighthorizon/index/cmd/batch/reduce/main.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index ec2c68abe3..d5092673ff 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -253,19 +253,20 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { WithField("indexed", transactionsProcessed). WithField("skipped", transactionsSkipped) - for i := byte(0x00); i < 0xff; i++ { + for i := int(0x00); i <= 0xff; i++ { + b := byte(i) // can't loop over range bc overflow if i%97 == 0 { logger.Infof("%d transactions processed (%d skipped)", transactionsProcessed, transactionsSkipped) } - if !config.shouldProcessTx(i, routineIndex) { + if !config.shouldProcessTx(b, routineIndex) { transactionsSkipped++ continue } transactionsProcessed++ - prefix := hex.EncodeToString([]byte{i}) + prefix := hex.EncodeToString([]byte{b}) for k := uint32(0); k < config.MapJobCount; k++ { url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) From 27d9fe37742b4493aeefa91e4229dce2069b35e3 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 29 Jun 2022 18:07:35 -0500 Subject: [PATCH 4/6] Isolate loggers to individual processing "sections" --- .../index/cmd/batch/reduce/main.go | 126 ++++++++++-------- 1 file changed, 70 insertions(+), 56 deletions(-) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index d5092673ff..bd8cfc028a 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -159,66 +159,70 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { for j := uint32(0); j < config.Workers; j++ { go func(routineIndex uint32) { defer wg.Done() - logger := jobLogger. + accountLog := jobLogger. WithField("worker", routineIndex). - WithField("total", len(accounts)) - logger.Info("Started worker") + WithField("subservice", "accounts") + accountLog.Info("Started worker") var accountsProcessed, accountsSkipped uint64 for account := range workQueues[routineIndex] { - logger.Infof("Account: %s", account) + accountLog. + WithField("total", len(accounts)). + WithField("indexed", accountsProcessed). + WithField("skipped", accountsSkipped) + + accountLog.Debugf("Account: %s", account) if (accountsProcessed+accountsSkipped)%97 == 0 { - logger. - WithField("indexed", accountsProcessed). - WithField("skipped", accountsSkipped). - Infof("Processed %d/%d accounts", - accountsProcessed+accountsSkipped, len(accounts)) + accountLog.Infof("Processed %d/%d accounts", + accountsProcessed+accountsSkipped, len(accounts)) } - logger.Infof("Reading index for account: %s", account) + accountLog.Debugf("Reading index for account: %s", account) // First, open the "final merged indices" at the root level // for this account. - mergedIndices, mergeErr := outerJobStore.Read(account) + mergedIndices, err := outerJobStore.Read(account) // TODO: in final version this should be critical error, now just skip it - if os.IsNotExist(mergeErr) { - logger.Errorf("Account %s is unavailable - TODO fix", account) + if os.IsNotExist(err) { + accountLog.Errorf("Account %s is unavailable - TODO fix", account) continue - } else if mergeErr != nil { - panic(mergeErr) + } else if err != nil { + panic(err) } // Then, iterate through all of the job folders and merge // indices from all jobs that touched this account. for k := uint32(0); k < config.MapJobCount; k++ { + var jobErr error url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) // FIXME: This could probably come from a pool. Every // worker needs to have a connection to every index // store, so there's no reason to re-open these for each // inner loop. - innerJobStore, indexErr := index.Connect(url) - if indexErr != nil { - logger.WithError(indexErr). + innerJobStore, jobErr := index.Connect(url) + if jobErr != nil { + accountLog.WithError(jobErr). Errorf("Failed to open index at %s", url) - panic(indexErr) + panic(jobErr) } - jobIndices, innerJobErr := innerJobStore.Read(account) + jobIndices, jobErr := innerJobStore.Read(account) + // This job never touched this account; skip. - if os.IsNotExist(innerJobErr) { + if os.IsNotExist(jobErr) { continue - } else if innerJobErr != nil { - logger.WithError(innerJobErr). + } else if jobErr != nil { + accountLog.WithError(jobErr). Errorf("Failed to read index for %s", account) - panic(innerJobErr) + panic(jobErr) } - if mergeIndexErr := mergeIndices(mergedIndices, jobIndices); mergeIndexErr != nil { - logger.WithError(mergeIndexErr). + if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil { + accountLog.WithError(jobErr). Errorf("Merge failure for index at %s", url) - panic(mergeIndexErr) + panic(jobErr) } } @@ -228,72 +232,82 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // Mark this account for other workers to ignore. doneAccounts.Add(account) accountsProcessed++ - logger = logger.WithField("processed", accountsProcessed) + accountLog = accountLog.WithField("processed", accountsProcessed) // Periodically flush to disk to save memory. if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 { - logger.Infof("Flushing indexed accounts.") + accountLog.Infof("Flushing indexed accounts.") if err = finalIndexStore.Flush(); err != nil { - logger.WithError(err).Errorf("Flush error.") + accountLog.WithError(err).Errorf("Flush error.") panic(err) } } } - jobLogger.Infof("Final account flush.") + accountLog.Infof("Final account flush.") if err = finalIndexStore.Flush(); err != nil { - logger.WithError(err).Errorf("Flush error.") + accountLog.WithError(err).Errorf("Flush error.") panic(err) } // Merge the transaction indexes // There's 256 files, (one for each first byte of the txn hash) - var transactionsProcessed, transactionsSkipped uint64 - logger = jobLogger. - WithField("indexed", transactionsProcessed). - WithField("skipped", transactionsSkipped) + txLog := jobLogger. + WithField("worker", routineIndex). + WithField("subservice", "transactions") + var prefixesProcessed, prefixesSkipped uint64 for i := int(0x00); i <= 0xff; i++ { b := byte(i) // can't loop over range bc overflow - if i%97 == 0 { - logger.Infof("%d transactions processed (%d skipped)", - transactionsProcessed, transactionsSkipped) + if b%97 == 0 { + txLog.Infof("Processed %d/%d prefixes (%d skipped)", + prefixesProcessed, 0xff, prefixesSkipped) } if !config.shouldProcessTx(b, routineIndex) { - transactionsSkipped++ + prefixesSkipped++ continue } - transactionsProcessed++ - prefix := hex.EncodeToString([]byte{b}) + txLog = txLog. + WithField("indexed", prefixesProcessed). + WithField("skipped", prefixesSkipped) + prefix := hex.EncodeToString([]byte{b}) for k := uint32(0); k < config.MapJobCount; k++ { url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) - innerJobStore, jobErr := index.Connect(url) - if jobErr != nil { - logger.WithError(jobErr).Errorf("Failed to open index at %s", url) - panic(jobErr) + var innerErr error + + innerJobStore, innerErr := index.Connect(url) + if innerErr != nil { + txLog.WithError(innerErr).Errorf("Failed to open index at %s", url) + panic(innerErr) } - innerTxnIndexes, innerJobErr := innerJobStore.ReadTransactions(prefix) - if os.IsNotExist(innerJobErr) { + innerTxnIndexes, innerErr := innerJobStore.ReadTransactions(prefix) + if os.IsNotExist(innerErr) { continue - } else if innerJobErr != nil { - logger.WithError(innerJobErr).Errorf("Error reading tx prefix %s", prefix) - panic(innerJobErr) + } else if innerErr != nil { + txLog.WithError(innerErr).Errorf("Error reading tx prefix %s", prefix) + panic(innerErr) } - if prefixErr := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { - logger.WithError(prefixErr).Errorf("Error merging txs at prefix %s", prefix) - panic(prefixErr) + if innerErr = finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); innerErr != nil { + txLog.WithError(innerErr).Errorf("Error merging txs at prefix %s", prefix) + panic(innerErr) } } + + prefixesProcessed++ } - jobLogger.Infof("Final transaction flush (%d processed)", transactionsProcessed) + txLog = txLog. + WithField("indexed", prefixesProcessed). + WithField("skipped", prefixesSkipped) + + txLog.Infof("Final transaction flush...") if err = finalIndexStore.Flush(); err != nil { - logger.Errorf("Error flushing transactions: %v", err) + txLog.Errorf("Error flushing transactions: %v", err) panic(err) } }(j) From 7498161e1aeefb013251a6ffca5a1c4ffb7b7fe9 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 30 Jun 2022 13:01:14 -0700 Subject: [PATCH 5/6] Bash script cleanup --- exp/lighthorizon/index/cmd/reduce.sh | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/exp/lighthorizon/index/cmd/reduce.sh b/exp/lighthorizon/index/cmd/reduce.sh index fb85a6f677..4d681b63f9 100755 --- a/exp/lighthorizon/index/cmd/reduce.sh +++ b/exp/lighthorizon/index/cmd/reduce.sh @@ -1,4 +1,11 @@ #!/bin/bash +# +# Combines indices that were built separately in different folders into a single +# set of indices. +# +# This focuses on starting parallel processes, but the Golang side does +# validation that the reduce jobs resulted in the correct indices. +# # check parameters and their validity (types, existence, etc.) @@ -18,12 +25,6 @@ if [[ ! -d "$2" ]]; then mkdir -p $2 fi -go build -o reduce ./batch/reduce/... -if [[ "$?" -ne "0" ]]; then - echo "Build failed" - exit 1 -fi - MAP_JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l) if [[ "$MAP_JOB_COUNT" -le "0" ]]; then echo "No jobs in index src root ('$1') found." @@ -31,6 +32,14 @@ if [[ "$MAP_JOB_COUNT" -le "0" ]]; then fi REDUCE_JOB_COUNT=$MAP_JOB_COUNT +# build reduce program and start it up + +go build -o reduce ./batch/reduce/... +if [[ "$?" -ne "0" ]]; then + echo "Build failed" + exit 1 +fi + echo "Coalescing $MAP_JOB_COUNT discovered job outputs from $1 into $2..." pids=( ) @@ -61,6 +70,6 @@ for i in "${!pids[@]}"; do fi done -rm ./reduce +rm ./reduce # cleanup echo "All jobs succeeded!" exit 0 \ No newline at end of file From 9bf6b448f7a0ccd3b478ee12c55a76c8522a151f Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 30 Jun 2022 13:05:09 -0700 Subject: [PATCH 6/6] Variable shadowing fixups --- exp/lighthorizon/index/cmd/batch/reduce/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index bd8cfc028a..20ce0ac877 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -181,14 +181,14 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // First, open the "final merged indices" at the root level // for this account. - mergedIndices, err := outerJobStore.Read(account) + mergedIndices, readErr := outerJobStore.Read(account) // TODO: in final version this should be critical error, now just skip it - if os.IsNotExist(err) { + if os.IsNotExist(readErr) { accountLog.Errorf("Account %s is unavailable - TODO fix", account) continue } else if err != nil { - panic(err) + panic(readErr) } // Then, iterate through all of the job folders and merge @@ -237,9 +237,9 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // Periodically flush to disk to save memory. if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 { accountLog.Infof("Flushing indexed accounts.") - if err = finalIndexStore.Flush(); err != nil { - accountLog.WithError(err).Errorf("Flush error.") - panic(err) + if flushErr := finalIndexStore.Flush(); flushErr != nil { + accountLog.WithError(flushErr).Errorf("Flush error.") + panic(flushErr) } } }