Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tpch + tpcds GHA launcher #3619

Merged
merged 18 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions benchmarking/tpch/ray_job_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ray[default]",
# "getdaft",
# ]
# ///

raunakab marked this conversation as resolved.
Show resolved Hide resolved
from __future__ import annotations

import argparse
Expand Down
70 changes: 70 additions & 0 deletions tools/tpcds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json

import github
import utils


def run(
branch_name: str,
questions: str,
scale_factor: int,
cluster_profile: str,
env_vars: str,
):
branch_name, _ = utils.get_name_and_commit_hash(branch_name)

expanded_questions = utils.parse_questions(questions, 99)
print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}")
args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions]
entrypoint_args = json.dumps(args_as_list)

workflow = utils.repo.get_workflow("run-cluster.yaml")
utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpcds",
"entrypoint_script": "ray_entrypoint.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument(
"--questions", type=str, required=False, default="*", help="A comma separated list of questions to run"
)
parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on")
raunakab marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on")
raunakab marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument(
"--env-vars",
type=str,
required=False,
help="A comma separated list of environment variables to pass to ray job",
)
raunakab marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
cluster_profile=args.cluster_profile,
env_vars=args.env_vars,
)
80 changes: 80 additions & 0 deletions tools/tpch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "PyGithub",
# "boto3",
# ]
# ///

import argparse
import json

import github
import utils

TOTAL_NUMBER_OF_QUESTIONS = 22


def run(
branch_name: str,
questions: str,
scale_factor: int,
num_partitions: int,
cluster_profile: str,
env_vars: str,
):
branch_name, _ = utils.get_name_and_commit_hash(branch_name)

expanded_questions = utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS)
print(
f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}"
)
args_as_list = [
f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/"
for q in expanded_questions
]
entrypoint_args = json.dumps(args_as_list)

workflow = utils.repo.get_workflow("run-cluster.yaml")
utils.dispatch(
workflow=workflow,
branch_name=branch_name,
inputs={
"cluster_profile": cluster_profile,
"working_dir": "benchmarking/tpch",
"entrypoint_script": "ray_job_runner.py",
"entrypoint_args": entrypoint_args,
"env_vars": env_vars,
},
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ref", type=str, required=False, help="The branch name to run on")
parser.add_argument(
"--questions", type=str, required=False, default="*", help="A comma separated list of questions to run"
)
parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on")
parser.add_argument("--num-partitions", type=int, required=False, default=2, help="The number of partitions")
parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on")
parser.add_argument(
"--env-vars",
type=str,
required=False,
help="A comma separated list of environment variables to pass to ray job",
)
parser.add_argument("--verbose", action="store_true", help="Verbose debugging")
args = parser.parse_args()

if args.verbose:
github.enable_console_debug_logging()

run(
branch_name=args.ref,
questions=args.questions,
scale_factor=args.scale_factor,
num_partitions=args.num_partitions,
cluster_profile=args.cluster_profile,
env_vars=args.env_vars,
)
116 changes: 116 additions & 0 deletions tools/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import subprocess
import time
import typing
from typing import Optional

import gha_run_cluster_job
from github import Auth, Github
from github.Workflow import Workflow
from github.WorkflowRun import WorkflowRun

RETRY_ATTEMPTS = 5

auth = Auth.Token(gha_run_cluster_job.get_oauth_token())
g = Github(auth=auth)
repo = g.get_repo("Eventual-Inc/Daft")


def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun:
pre_creation_latest_run = get_latest_run(workflow)

print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'")
created = workflow.create_dispatch(
ref=branch_name,
inputs=inputs,
)
if not created:
raise RuntimeError("Could not create workflow, suggestion: run again with --verbose")

post_creation_latest_run = None
for _ in range(RETRY_ATTEMPTS):
post_creation_latest_run = get_latest_run(workflow)
if pre_creation_latest_run.run_number == post_creation_latest_run.run_number:
sleep_and_then_retry()
elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number:
break
else:
typing.assert_never(
"Run numbers are always returned in sorted order and are always monotonically increasing"
)
if not post_creation_latest_run:
raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow")

print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}")
print(f"View the workflow run at: {post_creation_latest_run.html_url}")

return post_creation_latest_run


def sleep_and_then_retry(sleep_amount_sec: int = 3):
time.sleep(sleep_amount_sec)


def get_latest_run(workflow: Workflow) -> WorkflowRun:
for _ in range(RETRY_ATTEMPTS):
runs = workflow.get_runs()

if runs.totalCount > 0:
return runs[0]

sleep_and_then_retry()

raise RuntimeError("Unable to list all workflow invocations")


def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]:
branch_name = branch_name or "HEAD"
name = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT)
.strip()
.decode("utf-8")
)
commit_hash = (
subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8")
)
return name, commit_hash
raunakab marked this conversation as resolved.
Show resolved Hide resolved


def parse_questions(questions: str, total_number_of_questions: int) -> list[int]:
if not questions:
return []

if questions == "*":
return list(range(1, total_number_of_questions + 1))

items = questions.split(",")
nums = []
for item in items:
try:
num = int(item)
if num > total_number_of_questions:
raise RuntimeError(
f"Requested question number ({num}) is greater than the total number of questions available ({total_number_of_questions})"
)
nums.append(str(num))
continue
except ValueError:
...
raunakab marked this conversation as resolved.
Show resolved Hide resolved

if "-" not in item:
raise ValueError("...")
raunakab marked this conversation as resolved.
Show resolved Hide resolved

try:
lower, upper = item.split("-")
lower_int = int(lower)
upper_int = int(upper)
if lower_int > upper_int:
raise ValueError
if upper_int > total_number_of_questions:
raise RuntimeError(
f"Requested question number ({upper_int}) is greater than the total number of questions available ({total_number_of_questions})"
)
nums.extend(range(lower_int, upper_int + 1))
except ValueError:
raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}")

return nums
raunakab marked this conversation as resolved.
Show resolved Hide resolved
Loading