Skip to content

Commit

Permalink
feat: tpch + tpcds GHA launcher (#3619)
Browse files Browse the repository at this point in the history
# Overview

This PR adds a "tpch" and "tpcds" launcher to the available tools.
Allows you to easily scale up a ray-cluster and run queries against it.

## Usage

In order to run tpcds, run the following:

```sh
uv run tools/tpch.py --scale-factor=2 --num-partitions=2 --questions='1-10'
```

In order to run tpcds, run the following:

```sh
uv run tools/tpcds.py --scale-factor=100 --questions='1-10'
```

As always, if you want help, run `uv run tools/tpch.py --help` or `uv
run tools/tpcds.py --help`.
  • Loading branch information
raunakab authored Jan 2, 2025
1 parent e59581c commit 39bb62c
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 0 deletions.
8 changes: 8 additions & 0 deletions benchmarking/tpch/ray_job_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ray[default]",
# "getdaft",
# ]
# ///

from __future__ import annotations

import argparse
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ venvPath = "."
[[tool.pyright.executionEnvironments]]
root = ".github/ci-scripts"

[[tool.pyright.executionEnvironments]]
root = "tools"

[tool.pytest.ini_options]
addopts = "-m 'not (integration or benchmark or hypothesis)'"
minversion = "6.0"
Expand Down
91 changes: 91 additions & 0 deletions tools/git_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import subprocess
import time
import typing
from typing import Optional

import gha_run_cluster_job
from github import Auth, Github
from github.Workflow import Workflow
from github.WorkflowRun import WorkflowRun

RETRY_ATTEMPTS = 5

auth = Auth.Token(gha_run_cluster_job.get_oauth_token())
g = Github(auth=auth)
repo = g.get_repo("Eventual-Inc/Daft")


def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun:
pre_creation_latest_run = get_latest_run(workflow)

print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'")
created = workflow.create_dispatch(
ref=branch_name,
inputs=inputs,
)
if not created:
raise RuntimeError("Could not create workflow, suggestion: run again with --verbose")

post_creation_latest_run = None
for _ in range(RETRY_ATTEMPTS):
post_creation_latest_run = get_latest_run(workflow)
if pre_creation_latest_run.run_number == post_creation_latest_run.run_number:
sleep_and_then_retry()
elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number:
break
else:
typing.assert_never(
"Run numbers are always returned in sorted order and are always monotonically increasing"
)
if not post_creation_latest_run:
raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow")

print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}")
print(f"View the workflow run at: {post_creation_latest_run.html_url}")

return post_creation_latest_run


def sleep_and_then_retry(sleep_amount_sec: int = 3):
time.sleep(sleep_amount_sec)


def get_latest_run(workflow: Workflow) -> WorkflowRun:
for _ in range(RETRY_ATTEMPTS):
runs = workflow.get_runs()

if runs.totalCount > 0:
return runs[0]

sleep_and_then_retry()

raise RuntimeError("Unable to list all workflow invocations")


def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]:
branch_name = branch_name or "HEAD"
name = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT)
.strip()
.decode("utf-8")
)
commit_hash = (
subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8")
)
return name, commit_hash


def parse_questions(questions: Optional[str], total_number_of_questions: int) -> list[int]:
if questions is None:
return list(range(total_number_of_questions))
else:

def to_int(q: str) -> int:
question = int(q)
if question > total_number_of_questions:
raise ValueError(
f"Question number should be less than {total_number_of_questions}, instead got {question}"
)
return question

return list(map(to_int, filter(lambda q: q, questions.split(","))))
97 changes: 97 additions & 0 deletions tools/tpcds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json
from typing import Optional

import git_utils
import github


def run(
branch_name: str,
questions: Optional[str],
scale_factor: int,
cluster_profile: str,
env_vars: Optional[str],
):
branch_name, _ = git_utils.get_name_and_commit_hash(branch_name)

expanded_questions = git_utils.parse_questions(questions, 99)
print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}")
args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions]
entrypoint_args = json.dumps(args_as_list)

workflow = git_utils.repo.get_workflow("run-cluster.yaml")
git_utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpcds",
"entrypoint_script": "ray_entrypoint.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument("--questions", type=str, required=False, help="A comma separated list of questions to run")
parser.add_argument(
"--scale-factor",
choices=[
2,
5,
10,
100,
1000,
],
type=int,
required=False,
default=2,
help="The scale factor to run on",
)
parser.add_argument(
"--cluster-profile",
choices=["debug_xs-x86", "medium-x86"],
type=str,
required=False,
help="The ray cluster configuration to run on",
)
parser.add_argument(
"--env-var",
type=str,
action="append",
required=False,
help="Environment variable in the format KEY=VALUE. Can be specified multiple times.",
)
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

env_vars = None
if args.env_var:
list_of_env_vars: list[str] = args.env_var
for env_var in list_of_env_vars:
if "=" not in env_var:
raise ValueError("Environment variables must in the form `KEY=VALUE`")
env_vars = ",".join(list_of_env_vars)

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
cluster_profile=args.cluster_profile,
env_vars=env_vars,
)
101 changes: 101 additions & 0 deletions tools/tpch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json
from typing import Optional

import git_utils
import github

TOTAL_NUMBER_OF_QUESTIONS = 22


def run(
branch_name: str,
questions: Optional[str],
scale_factor: int,
num_partitions: int,
cluster_profile: str,
env_vars: Optional[str],
):
branch_name, _ = git_utils.get_name_and_commit_hash(branch_name)

expanded_questions = git_utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS)
print(
f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}"
)
args_as_list = [
f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/"
for q in expanded_questions
]
entrypoint_args = json.dumps(args_as_list)

workflow = git_utils.repo.get_workflow("run-cluster.yaml")
git_utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpch",
"entrypoint_script": "ray_job_runner.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument("--questions", type=str, required=False, help="A comma separated list of questions to run")
parser.add_argument(
"--scale-factor",
choices=[2, 10, 100, 1000],
type=int,
required=False,
default=2,
help="The scale factor to run on",
)
parser.add_argument("--num-partitions", type=int, required=True, help="The number of partitions")
parser.add_argument(
"--cluster-profile",
choices=["debug_xs-x86", "medium-x86"],
type=str,
required=False,
help="The ray cluster configuration to run on",
)
parser.add_argument(
"--env-var",
type=str,
action="append",
required=False,
help="Environment variable in the format KEY=VALUE. Can be specified multiple times.",
)
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

env_vars = None
if args.env_var:
list_of_env_vars: list[str] = args.env_var
for env_var in list_of_env_vars:
if "=" not in env_var:
raise ValueError("Environment variables must in the form `KEY=VALUE`")
env_vars = ",".join(list_of_env_vars)

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
num_partitions=args.num_partitions,
cluster_profile=args.cluster_profile,
env_vars=env_vars,
)

0 comments on commit 39bb62c

Please sign in to comment.