Skip to content

Commit

Permalink
Array-ify the run-cluster script
Browse files Browse the repository at this point in the history
  • Loading branch information
raunakab committed Dec 16, 2024
1 parent 6c21917 commit 735b16c
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 22 deletions.
123 changes: 123 additions & 0 deletions .github/ci-scripts/job_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import argparse
import asyncio
import json
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

import duckdb
from ray.job_submission import JobStatus, JobSubmissionClient


def parse_env_var_str(env_var_str: str) -> dict:
iter = map(
lambda s: s.strip().split("="),
filter(lambda s: s, env_var_str.split(",")),
)
return {k: v for k, v in iter}


async def print_logs(logs):
async for lines in logs:
print(lines, end="")


async def wait_on_job(logs, timeout_s):
await asyncio.wait_for(print_logs(logs), timeout=timeout_s)


def generate_data():
datadir = Path(__file__).parents[2] / "gendata"
datadir.mkdir(parents=True, exist_ok=True)
scale_factor = 0.01
db = duckdb.connect(database=datadir / "tpcds.db")
db.sql(f"call dsdgen(sf = {scale_factor})")
for item in db.sql("show tables").fetchall():
tbl = item[0]
parquet_file = datadir / f"{tbl}.parquet"
print(f"Exporting {tbl} to {parquet_file}")
db.sql(f"COPY {tbl} TO '{parquet_file}'")


@dataclass
class Result:
query: int
duration: timedelta
error_msg: Optional[str]


def submit_job(
working_dir: Path,
entrypoint_script: str,
entrypoint_args: str,
env_vars: str,
enable_ray_tracing: bool,
):
generate_data()

env_vars_dict = parse_env_var_str(env_vars)
if enable_ray_tracing:
env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1"

client = JobSubmissionClient(address="http://localhost:8265")

if entrypoint_args.startswith("[") and entrypoint_args.endswith("]"):
# this is a json-encoded list of strings; parse accordingly
list_of_entrypoint_args: list[str] = json.loads(entrypoint_args)
else:
list_of_entrypoint_args: list[str] = [entrypoint_args]

results = []

for index, args in enumerate(list_of_entrypoint_args):
entrypoint = f"DAFT_RUNNER=ray python {entrypoint_script} {args}"
print(f"{entrypoint=}")
start = datetime.now()
job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env={
"working_dir": working_dir,
"env_vars": env_vars_dict,
},
)

asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=60 * 30))

status = client.get_job_status(job_id)
assert status.is_terminal(), "Job should have terminated"
end = datetime.now()
duration = end - start
error_msg = None
if status != JobStatus.SUCCEEDED:
job_info = client.get_job_info(job_id)
error_msg = job_info.message

result = Result(query=index, duration=duration, error_msg=error_msg)
results.append(result)

print(f"{results=}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--working-dir", type=Path, required=True)
parser.add_argument("--entrypoint-script", type=str, required=True)
parser.add_argument("--entrypoint-args", type=str, required=True)
parser.add_argument("--env-vars", type=str, required=True)
parser.add_argument("--enable-ray-tracing", action="store_true")

args = parser.parse_args()

working_dir: Path = args.working_dir
assert working_dir.exists() and working_dir.is_dir(), "The working dir must exist and be directory"
entrypoint: Path = working_dir / args.entrypoint_script
assert entrypoint.exists() and entrypoint.is_file(), "The entrypoint script must exist and be a file"

submit_job(
working_dir=working_dir,
entrypoint_script=args.entrypoint_script,
entrypoint_args=args.entrypoint_args,
env_vars=args.env_vars,
enable_ray_tracing=args.enable_ray_tracing,
)
2 changes: 2 additions & 0 deletions .github/ci-scripts/templatize_ray_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,7 @@ class Metadata(BaseModel, extra="allow"):
if metadata:
metadata = Metadata(**metadata)
content = content.replace(OTHER_INSTALL_PLACEHOLDER, " ".join(metadata.dependencies))
else:
content = content.replace(OTHER_INSTALL_PLACEHOLDER, "")

print(content)
37 changes: 15 additions & 22 deletions .github/workflows/run-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,23 @@ jobs:
run: |
uv v
source .venv/bin/activate
uv pip install ray[default] boto3
uv pip install ray[default] boto3 duckdb
- name: Dynamically update ray config file
run: |
source .venv/bin/activate
(cat .github/assets/template.yaml | \
uv run \
--python 3.12 \
.github/ci-scripts/templatize_ray_config.py \
--cluster-name "ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url '${{ inputs.daft_wheel_url }}' \
--daft-version '${{ inputs.daft_version }}' \
--python-version '${{ inputs.python_version }}' \
--cluster-profile '${{ inputs.cluster_profile }}' \
--working-dir '${{ inputs.working_dir }}' \
--entrypoint-script '${{ inputs.entrypoint_script }}'
--cluster-name="ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url='${{ inputs.daft_wheel_url }}' \
--daft-version='${{ inputs.daft_version }}' \
--python-version='${{ inputs.python_version }}' \
--cluster-profile='${{ inputs.cluster_profile }}' \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}'
) >> .github/assets/ray.yaml
cat .github/assets/ray.yaml
- name: Setup ray env vars
run: |
source .venv/bin/activate
ray_env_var=$(python .github/ci-scripts/format_env_vars.py \
--env-vars '${{ inputs.env_vars }}' \
--enable-ray-tracing \
)
echo $ray_env_var
echo "ray_env_var=$ray_env_var" >> $GITHUB_ENV
- name: Download private ssh key
run: |
KEY=$(aws secretsmanager get-secret-value --secret-id ci-github-actions-ray-cluster-key-3 --query SecretString --output text)
Expand All @@ -117,11 +108,13 @@ jobs:
echo 'Invalid command submitted; command cannot be empty'
exit 1
fi
ray job submit \
--working-dir ${{ inputs.working_dir }} \
--address http://localhost:8265 \
--runtime-env-json "$ray_env_var" \
-- python ${{ inputs.entrypoint_script }} ${{ inputs.entrypoint_args }}
rm -rf daft
python .github/ci-scripts/job_runner.py \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}' \
--entrypoint-args='${{ inputs.entrypoint_args }}' \
--env-vars='${{ inputs.env_vars }}' \
--enable-ray-tracing
- name: Download log files from ray cluster
run: |
source .venv/bin/activate
Expand Down
5 changes: 5 additions & 0 deletions benchmarking/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import daft

df = daft.from_pydict({"nums": [1, 2, 3]})

df.show()
6 changes: 6 additions & 0 deletions benchmarking/tpcds/ray_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ def run(
)
args = parser.parse_args()

d = Path(".")
assert d.exists()
for x in d.iterdir():
print(f"subpath: {x}")

tpcds_gen_folder: Path = args.tpcds_gen_folder
print(f"{tpcds_gen_folder=}")
assert tpcds_gen_folder.exists()
assert args.question in range(1, 100)

Expand Down

0 comments on commit 735b16c

Please sign in to comment.