Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/build/index-batch: merge map/reduce updates to latest on feature branch #4543

Merged
merged 11 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,27 @@ jobs:
# Any range should do for basic testing, this range was chosen pretty early in history so that it only takes a few mins to run
run: |
chmod 755 ./exp/lighthorizon/build/build.sh
./exp/lighthorizon/build/build.sh ledgerexporter stellar latest false
docker run -e ARCHIVE_TARGET=file:///ledgerexport-test\
mkdir $PWD/ledgerexport
mkdir $PWD/index

./exp/lighthorizon/build/build.sh all stellar latest false
docker run -e ARCHIVE_TARGET=file:///ledgerexport\
-e START=5\
-e END=50\
-e END=150\
-e NETWORK_PASSPHRASE="Test SDF Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-testnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-testnet/core_testnet_001,https://history.stellar.org/prd/core-testnet/core_testnet_002"\
Copy link
Contributor

@2opremio 2opremio Aug 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea to use testnet (since it can be reset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there's a missing space before the horizon-light: job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, intent was to avoid the long catchup seen on pubnet ranges, trade-off was a potential window of a couple minutes of time when newly reset testnet hasn't gen'd at least 150 ledgers yet during which this CI job step would fail if was invoked at same time.

I changed it on a small follow-up PR #4552 to reference same early range but on pubnet, maybe that suffices, need to check how long catchup runs on that, if it's quick, then maybe suffices?

-v $PWD/ledgerexport:/ledgerexport\
stellar/lighthorizon-ledgerexporter

# run map job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
-e WORKER_COUNT=1 -e RUN_MODE=map -v $PWD/ledgerexport:/ledgermeta -e TXMETA_SOURCE=file:///ledgermeta -v $PWD/index:/index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch

# run reduce job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
-e WORKER_COUNT=1 -e RUN_MODE=reduce -v $PWD/index:/index -e INDEX_SOURCE_ROOT=file:///index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch

# Push images
- if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/lighthorizon'
name: Login to DockerHub
Expand Down
7 changes: 7 additions & 0 deletions exp/lighthorizon/build/index-batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `stellar/horizon-indexer`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the image called stellar/lighthorizon-index-batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for catch, fixed on small follow-up pr #4552.


This docker image contains the ledger/checkpoint indexing executables. It allows running multiple instances of `map`/`reduce` on a single machine or running it in [AWS Batch](https://aws.amazon.com/batch/).

## Env variables

See the [package documentation](../../index/cmd/batch/doc.go) for more details
1 change: 0 additions & 1 deletion exp/lighthorizon/build/index-batch/start
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ set -e

export TRACY_NO_INVARIANT_CHECK=1
NETWORK_PASSPHRASE="${NETWORK_PASSPHRASE:=Public Global Stellar Network ; September 2015}"

if [ "$RUN_MODE" == "reduce" ]; then
echo "Running Reduce, REDUCE JOBS: $REDUCE_JOB_COUNT MAP JOBS: $MAP_JOB_COUNT TARGET INDEX: $INDEX_TARGET"
/reduce
Expand Down
43 changes: 43 additions & 0 deletions exp/lighthorizon/build/k8s/lighthorizon_batch_map_job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: batch/v1
kind: Job
metadata:
name: 'batch-map-job'
spec:
completions: 52
parallelism: 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k8s job will only run 10 pods concurrently, and job will continue to start a new pod when one exits, until 52 pods have been launched, each pod getting assigned a unique JOB_COMPLETION_INDEX=0..51

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool, thanks!

completionMode: Indexed
template:
spec:
restartPolicy: Never
containers:
- name: 'worker'
image: 'stellar/lighthorizon-index-batch'
imagePullPolicy: Always
envFrom:
- secretRef:
name: <reference to secret name here if needed for source/target>
env:
- name: RUN_MODE
value: "map"
- name: BATCH_SIZE
value: "10048"
- name: FIRST_CHECKPOINT
value: "41426080"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, @2opremio mentioned running for roughly the last month to start with to verify the job stream, this is around July 1st.

- name: WORKER_COUNT
value: "8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 workers but 4 CPUs? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a little bit of time slicing, these can be suggestive, for where this is aiming to deploy on dev cluster, there aren't huge amount of resources available, this also ties into the parallelism: 10 which is multiplier of cpu/ram limits here for job total.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 workers but 4 CPUs? 🤔

I think the process is mostly IO-bound so I think it's fine to have more workers than CPUs

- name: TXMETA_SOURCE
value: "<url of txmeta source>"
- name: JOB_INDEX_ENV
value: "JOB_COMPLETION_INDEX"
- name: NETWORK_PASSPHRASE
value: "pubnet"
- name: INDEX_TARGET
value: "url of target index"
resources:
limits:
cpu: 4
memory: 5Gi
requests:
cpu: 500m
memory: 500Mi

42 changes: 42 additions & 0 deletions exp/lighthorizon/build/k8s/lighthorizon_batch_reduce_job copy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: batch/v1
kind: Job
metadata:
name: 'batch-reduce-job'
spec:
completions: 52
parallelism: 10
completionMode: Indexed
template:
spec:
restartPolicy: Never
containers:
- name: 'worker'
image: 'stellar/lighthorizon-index-batch'
imagePullPolicy: Always
envFrom:
- secretRef:
name: <reference to secret name here if needed for source/target>
env:
- name: RUN_MODE
value: "reduce"
- name: MAP_JOB_COUNT
value: 52
- name: REDUCE_JOB_COUNT
value: 52
- name: WORKER_COUNT
value: 8
- name: INDEX_SOURCE_ROOT
value: "<url of index location>"
- name: JOB_INDEX_ENV
value: JOB_COMPLETION_INDEX
- name: INDEX_TARGET
value: "<url of index location>"
resources:
limits:
cpu: 4
memory: 5Gi
requests:
cpu: 500m
memory: 500Mi


6 changes: 6 additions & 0 deletions exp/lighthorizon/index/backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
parallel = 1
}

err := os.MkdirAll(dir, fs.ModeDir|0755)
if err != nil {
log.Errorf("Unable to mkdir %s, %v", dir, err)
return nil, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change. I'm for it, but can we use this as a chance to add a docstring? e.g.

NewFileBackend connects to indices stored at `dir`, creating the directory if one doesn't 
exist, and uses `parallel` to control how many workers to use when flushing to disk.

return &FileBackend{
dir: dir,
parallel: parallel,
Expand Down
6 changes: 5 additions & 1 deletion exp/lighthorizon/index/cmd/batch/map/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type BatchConfig struct {

const (
batchSizeEnv = "BATCH_SIZE"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
jobIndexEnvName = "JOB_INDEX_ENV"
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"
Expand All @@ -39,6 +39,10 @@ func NewBatchConfig() (*BatchConfig, error) {
return nil, errors.New("required parameter: " + indexTargetUrlEnv)
}

jobIndexEnv := os.Getenv(jobIndexEnvName)
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}
jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
Expand Down
7 changes: 6 additions & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) {
mapJobsEnv = "MAP_JOB_COUNT"
reduceJobsEnv = "REDUCE_JOB_COUNT"
workerCountEnv = "WORKER_COUNT"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
jobIndexEnvName = "JOB_INDEX_ENV"
indexRootSourceEnv = "INDEX_SOURCE_ROOT"
indexTargetEnv = "INDEX_TARGET"
)

jobIndexEnv := os.Getenv(jobIndexEnvName)
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}

jobIndex, err := strconv.ParseUint(strings.TrimSpace(os.Getenv(jobIndexEnv)), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/map.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for (( i=0; i < $BATCH_COUNT; i++ ))
do
echo -n "Creating map job $i... "

NETWORK_PASSPHRASE='testnet' MODULES='accounts_unbacked,transactions' \
NETWORK_PASSPHRASE='testnet' JOB_INDEX_ENV='AWS_BATCH_JOB_ARRAY_INDEX' MODULES='accounts_unbacked,transactions' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here as on the other PR - why do we need the name of the name of the value rather than just passing the value itself? e.g. JOB_INDEX=$AWS_BATCH_JOB_ARRAY_INDEX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, good point, this would be a good simplification, might squeeze it in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think I see why, under different orchestration environments like k8s, aws, that layer injects the job index number as a container env variable, the name can be anything, AWS_BATCH_JOB_ARRAY_INDEX or JOB_COMPLETION_INDEX for k8s, and there is no shell layer like here for interpolation, the only way the app code which got launched in the container deployed on orchestration layer can realistically reference/find the value is to be given the known deployment env var key name for job index instead and app code then performs the 2nd look up against that in env space to get the value.

AWS_BATCH_JOB_ARRAY_INDEX=$i BATCH_SIZE=$BATCH_SIZE FIRST_CHECKPOINT=$FIRST \
TXMETA_SOURCE=file://$1 INDEX_TARGET=file://$2 WORKER_COUNT=1 \
./map &
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/reduce.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ for (( i=0; i < $REDUCE_JOB_COUNT; i++ ))
do
echo -n "Creating reduce job $i... "

AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \
AWS_BATCH_JOB_ARRAY_INDEX=$i JOB_INDEX_ENV="AWS_BATCH_JOB_ARRAY_INDEX" MAP_JOB_COUNT=$MAP_JOB_COUNT \
REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \
INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \
timeout -k 30s 10s ./reduce &
Expand Down