Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/index: More testing for batch indexing and off-by-one bugfix. #4442

Merged
merged 6 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 76 additions & 61 deletions exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,66 +159,70 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
for j := uint32(0); j < config.Workers; j++ {
go func(routineIndex uint32) {
defer wg.Done()
logger := jobLogger.
accountLog := jobLogger.
WithField("worker", routineIndex).
WithField("total", len(accounts))
logger.Info("Started worker")
WithField("subservice", "accounts")
accountLog.Info("Started worker")

var accountsProcessed, accountsSkipped uint64
for account := range workQueues[routineIndex] {
logger.Infof("Account: %s", account)
accountLog.
WithField("total", len(accounts)).
WithField("indexed", accountsProcessed).
WithField("skipped", accountsSkipped)
Shaptic marked this conversation as resolved.
Show resolved Hide resolved

accountLog.Debugf("Account: %s", account)
if (accountsProcessed+accountsSkipped)%97 == 0 {
logger.
WithField("indexed", accountsProcessed).
WithField("skipped", accountsSkipped).
Infof("Processed %d/%d accounts",
accountsProcessed+accountsSkipped, len(accounts))
accountLog.Infof("Processed %d/%d accounts",
accountsProcessed+accountsSkipped, len(accounts))
}

logger.Infof("Reading index for account: %s", account)
accountLog.Debugf("Reading index for account: %s", account)

// First, open the "final merged indices" at the root level
// for this account.
mergedIndices, mergeErr := outerJobStore.Read(account)
mergedIndices, readErr := outerJobStore.Read(account)

// TODO: in final version this should be critical error, now just skip it
if os.IsNotExist(mergeErr) {
logger.Errorf("Account %s is unavailable - TODO fix", account)
if os.IsNotExist(readErr) {
accountLog.Errorf("Account %s is unavailable - TODO fix", account)
continue
} else if mergeErr != nil {
panic(mergeErr)
} else if err != nil {
panic(readErr)
}

// Then, iterate through all of the job folders and merge
// indices from all jobs that touched this account.
for k := uint32(0); k < config.MapJobCount; k++ {
var jobErr error
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))

// FIXME: This could probably come from a pool. Every
// worker needs to have a connection to every index
// store, so there's no reason to re-open these for each
// inner loop.
innerJobStore, indexErr := index.Connect(url)
if indexErr != nil {
logger.WithError(indexErr).
innerJobStore, jobErr := index.Connect(url)
if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to open index at %s", url)
panic(indexErr)
panic(jobErr)
}

jobIndices, innerJobErr := innerJobStore.Read(account)
jobIndices, jobErr := innerJobStore.Read(account)

// This job never touched this account; skip.
if os.IsNotExist(innerJobErr) {
if os.IsNotExist(jobErr) {
continue
} else if innerJobErr != nil {
logger.WithError(innerJobErr).
} else if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to read index for %s", account)
panic(innerJobErr)
panic(jobErr)
}

if mergeIndexErr := mergeIndices(mergedIndices, jobIndices); mergeIndexErr != nil {
logger.WithError(mergeIndexErr).
if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Merge failure for index at %s", url)
panic(mergeIndexErr)
panic(jobErr)
}
}

Expand All @@ -228,71 +232,82 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
// Mark this account for other workers to ignore.
doneAccounts.Add(account)
accountsProcessed++
logger = logger.WithField("processed", accountsProcessed)
accountLog = accountLog.WithField("processed", accountsProcessed)

// Periodically flush to disk to save memory.
if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 {
logger.Infof("Flushing indexed accounts.")
if err = finalIndexStore.Flush(); err != nil {
logger.WithError(err).Errorf("Flush error.")
panic(err)
accountLog.Infof("Flushing indexed accounts.")
if flushErr := finalIndexStore.Flush(); flushErr != nil {
accountLog.WithError(flushErr).Errorf("Flush error.")
panic(flushErr)
}
}
}

jobLogger.Infof("Final account flush.")
accountLog.Infof("Final account flush.")
if err = finalIndexStore.Flush(); err != nil {
logger.WithError(err).Errorf("Flush error.")
accountLog.WithError(err).Errorf("Flush error.")
panic(err)
}

// Merge the transaction indexes
// There's 256 files, (one for each first byte of the txn hash)
var transactionsProcessed, transactionsSkipped uint64
logger = jobLogger.
WithField("indexed", transactionsProcessed).
WithField("skipped", transactionsSkipped)

for i := byte(0x00); i < 0xff; i++ {
if i%97 == 0 {
logger.Infof("%d transactions processed (%d skipped)",
transactionsProcessed, transactionsSkipped)
txLog := jobLogger.
WithField("worker", routineIndex).
WithField("subservice", "transactions")

var prefixesProcessed, prefixesSkipped uint64
for i := int(0x00); i <= 0xff; i++ {
b := byte(i) // can't loop over range bc overflow
Comment on lines +260 to +261
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I was staring at this for 10 minutes trying to understand what's going on here. Nice one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahh.. took a lot more than 10 minutes to figure out why the indices were incorrect 😆

For anyone else curious:

  • if you do for b := byte(0x00); b < 0xff; b++, then you have an off-by-one because you don't include 0xff
  • however, if you decide to do for b := byte(0x00); b <= 0xff; b++ (that is, change < to <=), then you introduce an overflow error because (0xff)++ becomes 0x00, so the loop never terminates

if b%97 == 0 {
txLog.Infof("Processed %d/%d prefixes (%d skipped)",
prefixesProcessed, 0xff, prefixesSkipped)
}

if !config.shouldProcessTx(i, routineIndex) {
transactionsSkipped++
if !config.shouldProcessTx(b, routineIndex) {
prefixesSkipped++
continue
}
transactionsProcessed++

prefix := hex.EncodeToString([]byte{i})
txLog = txLog.
WithField("indexed", prefixesProcessed).
WithField("skipped", prefixesSkipped)

prefix := hex.EncodeToString([]byte{b})
for k := uint32(0); k < config.MapJobCount; k++ {
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
innerJobStore, jobErr := index.Connect(url)
if jobErr != nil {
logger.WithError(jobErr).Errorf("Failed to open index at %s", url)
panic(jobErr)
var innerErr error

innerJobStore, innerErr := index.Connect(url)
if innerErr != nil {
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
panic(innerErr)
}

innerTxnIndexes, innerJobErr := innerJobStore.ReadTransactions(prefix)
if os.IsNotExist(innerJobErr) {
innerTxnIndexes, innerErr := innerJobStore.ReadTransactions(prefix)
if os.IsNotExist(innerErr) {
continue
} else if innerJobErr != nil {
logger.WithError(innerJobErr).Errorf("Error reading tx prefix %s", prefix)
panic(innerJobErr)
} else if innerErr != nil {
txLog.WithError(innerErr).Errorf("Error reading tx prefix %s", prefix)
panic(innerErr)
}

if prefixErr := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil {
logger.WithError(prefixErr).Errorf("Error merging txs at prefix %s", prefix)
panic(prefixErr)
if innerErr = finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); innerErr != nil {
txLog.WithError(innerErr).Errorf("Error merging txs at prefix %s", prefix)
panic(innerErr)
}
}

prefixesProcessed++
}

jobLogger.Infof("Final transaction flush (%d processed)", transactionsProcessed)
txLog = txLog.
WithField("indexed", prefixesProcessed).
WithField("skipped", prefixesSkipped)

txLog.Infof("Final transaction flush...")
if err = finalIndexStore.Flush(); err != nil {
logger.Errorf("Error flushing transactions: %v", err)
txLog.Errorf("Error flushing transactions: %v", err)
panic(err)
}
}(j)
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/map.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fi
pids=( )
for (( i=0; i < $BATCH_COUNT; i++ ))
do
echo -n "Creating job $i... "
echo -n "Creating map job $i... "

AWS_BATCH_JOB_ARRAY_INDEX=$i BATCH_SIZE=$BATCH_SIZE FIRST_CHECKPOINT=$FIRST \
TXMETA_SOURCE=file://$1 INDEX_TARGET=file://$2 WORKER_COUNT=1 \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main_test

import (
"encoding/hex"
"fmt"
"io"
"net/url"
Expand All @@ -22,6 +23,44 @@ const (
)

func TestMap(t *testing.T) {
RunMapTest(t)
}

func TestReduce(t *testing.T) {
// First, map the index files like we normally would.
startLedger, endLedger, jobRoot := RunMapTest(t)
batchCount := (endLedger - startLedger + batchSize) / batchSize // ceil(ledgerCount / batchSize)

// Now that indices have been "map"ped, reduce them to a single store.

indexTarget := filepath.Join(t.TempDir(), "final-indices")
reduceTestCmd := exec.Command("./reduce.sh", jobRoot, indexTarget)
t.Logf("Running %d reduce jobs: %s", batchCount, reduceTestCmd.String())
stdout, err := reduceTestCmd.CombinedOutput()
t.Logf(string(stdout))
require.NoError(t, err)

// Then, build the *same* indices using the single-process tester.

t.Logf("Building baseline for ledger range [%d, %d]", startLedger, endLedger)
hashes, participants := IndexLedgerRange(t, txmetaSource, startLedger, endLedger)

// Finally, compare the two to make sure the reduce job did what it's
// supposed to do.

indexStore, err := index.Connect("file://" + indexTarget)
require.NoError(t, err)
stores := []index.Store{indexStore} // to reuse code: same as array of 1 store

assertParticipantsEqual(t, keysU32(participants), stores)
for account, checkpoints := range participants {
assertParticipantCheckpointsEqual(t, account, checkpoints, stores)
}

assertTOIDsEqual(t, hashes, stores)
}

func RunMapTest(t *testing.T) (uint32, uint32, string) {
// Only file:// style URLs for the txmeta source are allowed while testing.
parsed, err := url.Parse(txmetaSource)
require.NoErrorf(t, err, "%s is not a valid URL", txmetaSource)
Expand Down Expand Up @@ -78,8 +117,6 @@ func TestMap(t *testing.T) {
// Then, build the *same* indices using the single-process tester.
t.Logf("Building baseline for ledger range [%d, %d]", startLedger, endLedger)
hashes, participants := IndexLedgerRange(t, txmetaSource, startLedger, endLedger)
require.NotNil(t, hashes)
require.NotNil(t, participants)

// Now, walk through the mapped indices and ensure that at least one of the
// jobs reported the same indices for tx TOIDs and participation.
Expand All @@ -103,6 +140,10 @@ func TestMap(t *testing.T) {
for account, checkpoints := range participants {
assertParticipantCheckpointsEqual(t, account, checkpoints, stores)
}

assertTOIDsEqual(t, hashes, stores)

return startLedger, endLedger, tempDir
}

func assertParticipantsEqual(t *testing.T,
Expand Down Expand Up @@ -164,6 +205,31 @@ func assertParticipantCheckpointsEqual(t *testing.T,
}
}

func assertTOIDsEqual(t *testing.T, toids map[string]int64, stores []index.Store) {
for hash, toid := range toids {
rawHash := [32]byte{}
decodedHash, err := hex.DecodeString(hash)
require.NoError(t, err)
require.Lenf(t, decodedHash, 32, "invalid tx hash length")
copy(rawHash[:], decodedHash)

found := false
for i, store := range stores {
storeToid, err := store.TransactionTOID(rawHash)
if err != nil {
require.ErrorIsf(t, err, io.EOF,
"only EOF errors are allowed (store %d, hash %s)", i, hash)
} else {
require.Equalf(t, toid, storeToid,
"TOIDs for tx 0x%s don't match (store %d)", hash, i)
found = true
}
}

require.Truef(t, found, "TOID for tx 0x%s not found in stores", hash)
}
}

func keysU32(dict map[string][]uint32) []string {
result := make([]string, 0, len(dict))
for key := range dict {
Expand Down
Loading