Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/cmd: index batch reduce tweaks #4552

Merged
merged 4 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ jobs:
- if: github.ref == 'refs/heads/master'
name: Push to DockerHub
run: docker push stellar/horizon-verify-range:latest

horizon-light:
name: Test and Push the horizon light images
runs-on: ubuntu-latest
Expand All @@ -141,18 +142,18 @@ jobs:
docker run -e ARCHIVE_TARGET=file:///ledgerexport\
-e START=5\
-e END=150\
-e NETWORK_PASSPHRASE="Test SDF Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-testnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-testnet/core_testnet_001,https://history.stellar.org/prd/core-testnet/core_testnet_002"\
-e NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-pubnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-live/core_live_001"\
-v $PWD/ledgerexport:/ledgerexport\
stellar/lighthorizon-ledgerexporter

# run map job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
docker run -e NETWORK_PASSPHRASE='prodnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
sreuland marked this conversation as resolved.
Show resolved Hide resolved
-e WORKER_COUNT=1 -e RUN_MODE=map -v $PWD/ledgerexport:/ledgermeta -e TXMETA_SOURCE=file:///ledgermeta -v $PWD/index:/index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch

# run reduce job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
docker run -e NETWORK_PASSPHRASE='prodnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
-e WORKER_COUNT=1 -e RUN_MODE=reduce -v $PWD/index:/index -e INDEX_SOURCE_ROOT=file:///index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch

# Push images
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/build/index-batch/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# `stellar/horizon-indexer`
# `stellar/lighthorizon-index-batch`

This docker image contains the ledger/checkpoint indexing executables. It allows running multiple instances of `map`/`reduce` on a single machine or running it in [AWS Batch](https://aws.amazon.com/batch/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ spec:
- name: RUN_MODE
value: "reduce"
- name: MAP_JOB_COUNT
value: 52
value: "52"
- name: REDUCE_JOB_COUNT
value: 52
value: "52"
- name: WORKER_COUNT
value: 8
value: "8"
- name: INDEX_SOURCE_ROOT
value: "<url of index location>"
- name: JOB_INDEX_ENV
Expand Down
1 change: 0 additions & 1 deletion exp/lighthorizon/build/k8s/lighthorizon_index.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spec:
fluxcd.io/ignore: "true"
prometheus.io/port: "6060"
prometheus.io/scrape: "false"
creationTimestamp: null
labels:
app: lighthorizon-pubnet-index
spec:
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSimpleFileStore(t *testing.T) {
// Create a large (beyond a single chunk) list of arbitrary accounts, some
// regular and some muxed.
accountList := make([]string, 123)
for i, _ := range accountList {
for i := range accountList {
var err error
var muxed xdr.MuxedAccount
address := keypair.MustRandom().Address()
Expand Down
36 changes: 22 additions & 14 deletions exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"encoding/hex"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -42,7 +40,7 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) {
indexTargetEnv = "INDEX_TARGET"
)

jobIndexEnv := os.Getenv(jobIndexEnvName)
jobIndexEnv := strings.TrimSpace(os.Getenv(jobIndexEnvName))
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}
Expand Down Expand Up @@ -114,12 +112,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
for i := uint32(0); i < config.MapJobCount; i++ {
jobLogger := log.WithField("job", i)

url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10))
jobLogger.Infof("Connecting to %s", url)
jobSubPath := "job_" + strconv.FormatUint(uint64(i), 10)
jobLogger.Infof("Connecting to url %s, sub-path %s", config.IndexRootSource, jobSubPath)
outerJobStore, err := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: jobSubPath,
})

outerJobStore, err := index.Connect(url)
if err != nil {
return errors.Wrapf(err, "failed to connect to indices at %s", url)
return errors.Wrapf(err, "failed to connect to indices at %s, sub-path %s", config.IndexRootSource, jobSubPath)
}

accounts, err := outerJobStore.ReadAccounts()
Expand Down Expand Up @@ -201,16 +202,20 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
// indices from all jobs that touched this account.
for k := uint32(0); k < config.MapJobCount; k++ {
var jobErr error
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))

// FIXME: This could probably come from a pool. Every
// worker needs to have a connection to every index
// store, so there's no reason to re-open these for each
// inner loop.
innerJobStore, jobErr := index.Connect(url)
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, jobErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to open index at %s", url)
Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}

Expand All @@ -227,7 +232,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Merge failure for index at %s", url)
Errorf("Merge failure for index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}
}
Expand Down Expand Up @@ -281,12 +286,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

prefix := hex.EncodeToString([]byte{b})
for k := uint32(0); k < config.MapJobCount; k++ {
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
var innerErr error
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, innerErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

innerJobStore, innerErr := index.Connect(url)
if innerErr != nil {
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
txLog.WithError(innerErr).Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(innerErr)
}

Expand Down
10 changes: 10 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
}
switch parsed.Scheme {
case "s3":
s3Url := fmt.Sprintf("%s/%s", config.URL, config.URLSubPath)
parsed, err = url.Parse(s3Url)
if err != nil {
return nil, err
}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
Expand All @@ -33,6 +38,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
fileUrl := filepath.Join(config.URL, config.URLSubPath)
parsed, err = url.Parse(fileUrl)
if err != nil {
return nil, err
}
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ type Store interface {

type StoreConfig struct {
// init time config
URL string
Workers uint32
// the base url for the store resource
URL string
// optional url path to append to the base url to realize the complete url
URLSubPath string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the rationale for putting this into the store itself? It feels odd to me that the index store itself should care about something like a sub-URL when the caller can just do a Join(basePath, subPath) prior to calling index.ConnectWithConfig. That is, above in this diff, why not do the string join prior to the connect as it was before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filepath.Join("s3://testing", "job_0") will mangle the proto scheme slashes, the result from this example is s3:/testing/job_0 which aws sdk didn't like, so, moved the url path composition down to the point at which real store is determined from scheme already and build URL paths in that known context, rather than caller duplicating the same logic to check scheme first to identify store type to know how to compose url path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 🤔 still a little weird to me as I think it's still the caller's responsibility to build sane paths, but I can understand the rationale for hiding it, as well.

Workers uint32

// runtime config
ClearMemoryOnFlush bool
Expand Down