From 735b16caa1e6322a9b5e2f0004eb56cd18d36849 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Fri, 13 Dec 2024 18:09:24 -0800 Subject: [PATCH 01/16] Array-ify the run-cluster script --- .github/ci-scripts/job_runner.py | 123 ++++++++++++++++++++ .github/ci-scripts/templatize_ray_config.py | 2 + .github/workflows/run-cluster.yaml | 37 +++--- benchmarking/test.py | 5 + benchmarking/tpcds/ray_entrypoint.py | 6 + 5 files changed, 151 insertions(+), 22 deletions(-) create mode 100644 .github/ci-scripts/job_runner.py create mode 100644 benchmarking/test.py diff --git a/.github/ci-scripts/job_runner.py b/.github/ci-scripts/job_runner.py new file mode 100644 index 0000000000..914aae79fc --- /dev/null +++ b/.github/ci-scripts/job_runner.py @@ -0,0 +1,123 @@ +import argparse +import asyncio +import json +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +import duckdb +from ray.job_submission import JobStatus, JobSubmissionClient + + +def parse_env_var_str(env_var_str: str) -> dict: + iter = map( + lambda s: s.strip().split("="), + filter(lambda s: s, env_var_str.split(",")), + ) + return {k: v for k, v in iter} + + +async def print_logs(logs): + async for lines in logs: + print(lines, end="") + + +async def wait_on_job(logs, timeout_s): + await asyncio.wait_for(print_logs(logs), timeout=timeout_s) + + +def generate_data(): + datadir = Path(__file__).parents[2] / "gendata" + datadir.mkdir(parents=True, exist_ok=True) + scale_factor = 0.01 + db = duckdb.connect(database=datadir / "tpcds.db") + db.sql(f"call dsdgen(sf = {scale_factor})") + for item in db.sql("show tables").fetchall(): + tbl = item[0] + parquet_file = datadir / f"{tbl}.parquet" + print(f"Exporting {tbl} to {parquet_file}") + db.sql(f"COPY {tbl} TO '{parquet_file}'") + + +@dataclass +class Result: + query: int + duration: timedelta + error_msg: Optional[str] + + +def submit_job( + working_dir: Path, + entrypoint_script: str, + entrypoint_args: str, + env_vars: str, + enable_ray_tracing: bool, +): + generate_data() + + env_vars_dict = parse_env_var_str(env_vars) + if enable_ray_tracing: + env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1" + + client = JobSubmissionClient(address="http://localhost:8265") + + if entrypoint_args.startswith("[") and entrypoint_args.endswith("]"): + # this is a json-encoded list of strings; parse accordingly + list_of_entrypoint_args: list[str] = json.loads(entrypoint_args) + else: + list_of_entrypoint_args: list[str] = [entrypoint_args] + + results = [] + + for index, args in enumerate(list_of_entrypoint_args): + entrypoint = f"DAFT_RUNNER=ray python {entrypoint_script} {args}" + print(f"{entrypoint=}") + start = datetime.now() + job_id = client.submit_job( + entrypoint=entrypoint, + runtime_env={ + "working_dir": working_dir, + "env_vars": env_vars_dict, + }, + ) + + asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=60 * 30)) + + status = client.get_job_status(job_id) + assert status.is_terminal(), "Job should have terminated" + end = datetime.now() + duration = end - start + error_msg = None + if status != JobStatus.SUCCEEDED: + job_info = client.get_job_info(job_id) + error_msg = job_info.message + + result = Result(query=index, duration=duration, error_msg=error_msg) + results.append(result) + + print(f"{results=}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--working-dir", type=Path, required=True) + parser.add_argument("--entrypoint-script", type=str, required=True) + parser.add_argument("--entrypoint-args", type=str, required=True) + parser.add_argument("--env-vars", type=str, required=True) + parser.add_argument("--enable-ray-tracing", action="store_true") + + args = parser.parse_args() + + working_dir: Path = args.working_dir + assert working_dir.exists() and working_dir.is_dir(), "The working dir must exist and be directory" + entrypoint: Path = working_dir / args.entrypoint_script + assert entrypoint.exists() and entrypoint.is_file(), "The entrypoint script must exist and be a file" + + submit_job( + working_dir=working_dir, + entrypoint_script=args.entrypoint_script, + entrypoint_args=args.entrypoint_args, + env_vars=args.env_vars, + enable_ray_tracing=args.enable_ray_tracing, + ) diff --git a/.github/ci-scripts/templatize_ray_config.py b/.github/ci-scripts/templatize_ray_config.py index 30fe47f995..1608cf8dee 100644 --- a/.github/ci-scripts/templatize_ray_config.py +++ b/.github/ci-scripts/templatize_ray_config.py @@ -110,5 +110,7 @@ class Metadata(BaseModel, extra="allow"): if metadata: metadata = Metadata(**metadata) content = content.replace(OTHER_INSTALL_PLACEHOLDER, " ".join(metadata.dependencies)) + else: + content = content.replace(OTHER_INSTALL_PLACEHOLDER, "") print(content) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index 644250d1f1..abc8dc17a4 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -71,7 +71,7 @@ jobs: run: | uv v source .venv/bin/activate - uv pip install ray[default] boto3 + uv pip install ray[default] boto3 duckdb - name: Dynamically update ray config file run: | source .venv/bin/activate @@ -79,24 +79,15 @@ jobs: uv run \ --python 3.12 \ .github/ci-scripts/templatize_ray_config.py \ - --cluster-name "ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \ - --daft-wheel-url '${{ inputs.daft_wheel_url }}' \ - --daft-version '${{ inputs.daft_version }}' \ - --python-version '${{ inputs.python_version }}' \ - --cluster-profile '${{ inputs.cluster_profile }}' \ - --working-dir '${{ inputs.working_dir }}' \ - --entrypoint-script '${{ inputs.entrypoint_script }}' + --cluster-name="ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \ + --daft-wheel-url='${{ inputs.daft_wheel_url }}' \ + --daft-version='${{ inputs.daft_version }}' \ + --python-version='${{ inputs.python_version }}' \ + --cluster-profile='${{ inputs.cluster_profile }}' \ + --working-dir='${{ inputs.working_dir }}' \ + --entrypoint-script='${{ inputs.entrypoint_script }}' ) >> .github/assets/ray.yaml cat .github/assets/ray.yaml - - name: Setup ray env vars - run: | - source .venv/bin/activate - ray_env_var=$(python .github/ci-scripts/format_env_vars.py \ - --env-vars '${{ inputs.env_vars }}' \ - --enable-ray-tracing \ - ) - echo $ray_env_var - echo "ray_env_var=$ray_env_var" >> $GITHUB_ENV - name: Download private ssh key run: | KEY=$(aws secretsmanager get-secret-value --secret-id ci-github-actions-ray-cluster-key-3 --query SecretString --output text) @@ -117,11 +108,13 @@ jobs: echo 'Invalid command submitted; command cannot be empty' exit 1 fi - ray job submit \ - --working-dir ${{ inputs.working_dir }} \ - --address http://localhost:8265 \ - --runtime-env-json "$ray_env_var" \ - -- python ${{ inputs.entrypoint_script }} ${{ inputs.entrypoint_args }} + rm -rf daft + python .github/ci-scripts/job_runner.py \ + --working-dir='${{ inputs.working_dir }}' \ + --entrypoint-script='${{ inputs.entrypoint_script }}' \ + --entrypoint-args='${{ inputs.entrypoint_args }}' \ + --env-vars='${{ inputs.env_vars }}' \ + --enable-ray-tracing - name: Download log files from ray cluster run: | source .venv/bin/activate diff --git a/benchmarking/test.py b/benchmarking/test.py new file mode 100644 index 0000000000..7e541e96b4 --- /dev/null +++ b/benchmarking/test.py @@ -0,0 +1,5 @@ +import daft + +df = daft.from_pydict({"nums": [1, 2, 3]}) + +df.show() diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 10e52c4198..46473e7113 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -42,7 +42,13 @@ def run( ) args = parser.parse_args() + d = Path(".") + assert d.exists() + for x in d.iterdir(): + print(f"subpath: {x}") + tpcds_gen_folder: Path = args.tpcds_gen_folder + print(f"{tpcds_gen_folder=}") assert tpcds_gen_folder.exists() assert args.question in range(1, 100) From 05c51ab07a78850ba491612d104f63baceb12492 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 14:37:56 -0800 Subject: [PATCH 02/16] Remove test file --- benchmarking/test.py | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 benchmarking/test.py diff --git a/benchmarking/test.py b/benchmarking/test.py deleted file mode 100644 index 7e541e96b4..0000000000 --- a/benchmarking/test.py +++ /dev/null @@ -1,5 +0,0 @@ -import daft - -df = daft.from_pydict({"nums": [1, 2, 3]}) - -df.show() From a744cf2c5a5181d0d4ff746c95b106f628764f1a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 14:46:03 -0800 Subject: [PATCH 03/16] Change to raising errors instead of performing assertions --- .github/ci-scripts/job_runner.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/ci-scripts/job_runner.py b/.github/ci-scripts/job_runner.py index 914aae79fc..e3c49635ca 100644 --- a/.github/ci-scripts/job_runner.py +++ b/.github/ci-scripts/job_runner.py @@ -109,13 +109,15 @@ def submit_job( args = parser.parse_args() - working_dir: Path = args.working_dir - assert working_dir.exists() and working_dir.is_dir(), "The working dir must exist and be directory" - entrypoint: Path = working_dir / args.entrypoint_script - assert entrypoint.exists() and entrypoint.is_file(), "The entrypoint script must exist and be a file" + if not (args.working_dir.exists() and args.working_dir.is_dir()): + raise ValueError("The working-dir must exist and be a directory") + + entrypoint: Path = args.working_dir / args.entrypoint_script + if not (entrypoint.exists() and entrypoint.is_file()): + raise ValueError("The entrypoint script must exist and be a file") submit_job( - working_dir=working_dir, + working_dir=args.working_dir, entrypoint_script=args.entrypoint_script, entrypoint_args=args.entrypoint_args, env_vars=args.env_vars, From 96ec53114479e9001b53ac454763d666a77f7931 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 14:46:47 -0800 Subject: [PATCH 04/16] Remove debug prints --- benchmarking/tpcds/ray_entrypoint.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 46473e7113..fe34f10eb1 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -42,14 +42,7 @@ def run( ) args = parser.parse_args() - d = Path(".") - assert d.exists() - for x in d.iterdir(): - print(f"subpath: {x}") - - tpcds_gen_folder: Path = args.tpcds_gen_folder - print(f"{tpcds_gen_folder=}") - assert tpcds_gen_folder.exists() + assert args.tpcds_gen_folder.exists() assert args.question in range(1, 100) run(args.tpcds_gen_folder, args.question, args.dry_run) From 46ebcabbeefcf721296e438c86f32d41fc6cb8f6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 15:37:02 -0800 Subject: [PATCH 05/16] Add catalog generation from s3 instead of from local --- benchmarking/tpcds/ray_entrypoint.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index fe34f10eb1..6fb3ea6732 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -7,11 +7,10 @@ def run( - parquet_folder: Path, question: int, dry_run: bool, ): - catalog = helpers.generate_catalog(parquet_folder) + catalog = helpers.generate_catalog("s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/") query_file = Path(__file__).parent / "queries" / f"{question:02}.sql" with open(query_file) as f: query = f.read() @@ -23,12 +22,6 @@ def run( if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument( - "--tpcds-gen-folder", - required=True, - type=Path, - help="Path to the TPC-DS data generation folder", - ) parser.add_argument( "--question", required=True, @@ -42,7 +35,7 @@ def run( ) args = parser.parse_args() - assert args.tpcds_gen_folder.exists() + # assert args.tpcds_gen_folder.exists() assert args.question in range(1, 100) run(args.tpcds_gen_folder, args.question, args.dry_run) From b3da1a349e061d55a609d4c7f77226dc73407043 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 15:39:07 -0800 Subject: [PATCH 06/16] Remove data-gen --- .github/ci-scripts/job_runner.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/.github/ci-scripts/job_runner.py b/.github/ci-scripts/job_runner.py index e3c49635ca..f216fa39ed 100644 --- a/.github/ci-scripts/job_runner.py +++ b/.github/ci-scripts/job_runner.py @@ -6,7 +6,6 @@ from pathlib import Path from typing import Optional -import duckdb from ray.job_submission import JobStatus, JobSubmissionClient @@ -27,19 +26,6 @@ async def wait_on_job(logs, timeout_s): await asyncio.wait_for(print_logs(logs), timeout=timeout_s) -def generate_data(): - datadir = Path(__file__).parents[2] / "gendata" - datadir.mkdir(parents=True, exist_ok=True) - scale_factor = 0.01 - db = duckdb.connect(database=datadir / "tpcds.db") - db.sql(f"call dsdgen(sf = {scale_factor})") - for item in db.sql("show tables").fetchall(): - tbl = item[0] - parquet_file = datadir / f"{tbl}.parquet" - print(f"Exporting {tbl} to {parquet_file}") - db.sql(f"COPY {tbl} TO '{parquet_file}'") - - @dataclass class Result: query: int @@ -54,8 +40,6 @@ def submit_job( env_vars: str, enable_ray_tracing: bool, ): - generate_data() - env_vars_dict = parse_env_var_str(env_vars) if enable_ray_tracing: env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1" From e936e9fcb031fefec78df88fc0cbdfba49ac8a5d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 15:48:32 -0800 Subject: [PATCH 07/16] Remove extra variable --- benchmarking/tpcds/ray_entrypoint.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 6fb3ea6732..178bb8edf0 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -35,7 +35,6 @@ def run( ) args = parser.parse_args() - # assert args.tpcds_gen_folder.exists() assert args.question in range(1, 100) - run(args.tpcds_gen_folder, args.question, args.dry_run) + run(args.question, args.dry_run) From 5c14913c7914d357e444e80234ac6fc7a4b53e2d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 16:22:00 -0800 Subject: [PATCH 08/16] Change up catalog registration --- benchmarking/tpcds/ray_entrypoint.py | 44 ++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 178bb8edf0..f727ce3664 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -1,16 +1,54 @@ import argparse from pathlib import Path -import helpers - import daft +from daft.sql.sql import SQLCatalog + +TABLE_NAMES = [ + "call_center", + "catalog_page", + "catalog_returns", + "catalog_sales", + "customer", + "customer_address", + "customer_demographics", + "date_dim", + "household_demographics", + "income_band", + "inventory", + "item", + "promotion", + "reason", + "ship_mode", + "store", + "store_returns", + "store_sales", + "time_dim", + "tpcds", + "warehouse", + "web_page", + "web_returns", + "web_sales", + "web_site", +] + + +def register_catalog() -> SQLCatalog: + return SQLCatalog( + tables={ + table: daft.read_parquet( + f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/{table}.parquet" + ) + for table in TABLE_NAMES + } + ) def run( question: int, dry_run: bool, ): - catalog = helpers.generate_catalog("s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/") + catalog = register_catalog() query_file = Path(__file__).parent / "queries" / f"{question:02}.sql" with open(query_file) as f: query = f.read() From b96aed24144ca32dd8ddebca0ea78a4507378e09 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 16:29:37 -0800 Subject: [PATCH 09/16] Generate catalog off of s3 urls instead --- benchmarking/tpcds/ray_entrypoint.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index f727ce3664..e55c1dc925 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -24,7 +24,6 @@ "store_returns", "store_sales", "time_dim", - "tpcds", "warehouse", "web_page", "web_returns", @@ -37,7 +36,7 @@ def register_catalog() -> SQLCatalog: return SQLCatalog( tables={ table: daft.read_parquet( - f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/{table}.parquet" + f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/{table}.parquet/" ) for table in TABLE_NAMES } From dbe5f8cd4ca7fe5dcf81664dabd858e704944b09 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 16:35:26 -0800 Subject: [PATCH 10/16] Remove the removal of the daft dir --- .github/workflows/run-cluster.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index abc8dc17a4..eeeb3e436a 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -108,7 +108,6 @@ jobs: echo 'Invalid command submitted; command cannot be empty' exit 1 fi - rm -rf daft python .github/ci-scripts/job_runner.py \ --working-dir='${{ inputs.working_dir }}' \ --entrypoint-script='${{ inputs.entrypoint_script }}' \ From 54e2269ee8d670cb3305f521a8c1ed7bf067c995 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 16:42:31 -0800 Subject: [PATCH 11/16] Add scale-factor argument --- benchmarking/tpcds/ray_entrypoint.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index e55c1dc925..08dd69ee06 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -32,11 +32,11 @@ ] -def register_catalog() -> SQLCatalog: +def register_catalog(scale_factor: int) -> SQLCatalog: return SQLCatalog( tables={ table: daft.read_parquet( - f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/2/{table}.parquet/" + f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}.parquet/" ) for table in TABLE_NAMES } @@ -46,8 +46,9 @@ def register_catalog() -> SQLCatalog: def run( question: int, dry_run: bool, + scale_factor: int, ): - catalog = register_catalog() + catalog = register_catalog(scale_factor) query_file = Path(__file__).parent / "queries" / f"{question:02}.sql" with open(query_file) as f: query = f.read() @@ -70,8 +71,13 @@ def run( action="store_true", help="Whether or not to run the query in dry-run mode; if true, only the plan will be printed out", ) + parser.add_argument( + "--scale-factor", + type=int, + help="Which scale factor to run this data at", + ) args = parser.parse_args() assert args.question in range(1, 100) - run(args.question, args.dry_run) + run(question=args.question, dry_run=args.dry_run, scale_factor=args.scale_factor) From 8ed2077cfae9b1f760bffb8ca016e7e9dc130bb6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 16:43:22 -0800 Subject: [PATCH 12/16] Add default scale-factor size --- benchmarking/tpcds/ray_entrypoint.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 08dd69ee06..34f2511fa7 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -62,9 +62,9 @@ def run( parser = argparse.ArgumentParser() parser.add_argument( "--question", - required=True, type=int, help="The TPC-DS question index to run", + required=True, ) parser.add_argument( "--dry-run", @@ -75,6 +75,8 @@ def run( "--scale-factor", type=int, help="Which scale factor to run this data at", + required=False, + default=2, ) args = parser.parse_args() From 674ca8f680a6c3d230aeadfaef713d1431e4c6a6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 16 Dec 2024 20:17:31 -0800 Subject: [PATCH 13/16] Remove duckdb dep --- .github/workflows/run-cluster.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index eeeb3e436a..d6bc4a43ee 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -71,7 +71,7 @@ jobs: run: | uv v source .venv/bin/activate - uv pip install ray[default] boto3 duckdb + uv pip install ray[default] boto3 - name: Dynamically update ray config file run: | source .venv/bin/activate From debb5b8e4982431ef10ac4b51362f9f17e7d8ea2 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 11:43:33 -0800 Subject: [PATCH 14/16] Add inline metadata to job_runner script --- .github/ci-scripts/job_runner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/ci-scripts/job_runner.py b/.github/ci-scripts/job_runner.py index f216fa39ed..12c949136f 100644 --- a/.github/ci-scripts/job_runner.py +++ b/.github/ci-scripts/job_runner.py @@ -1,3 +1,8 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [] +# /// + import argparse import asyncio import json From 92f227c524f276cfe376e9af689d6043b37b814a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 11:45:52 -0800 Subject: [PATCH 15/16] Edit description --- .github/workflows/run-cluster.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index d6bc4a43ee..e0262be2cb 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -34,7 +34,7 @@ on: type: string required: true entrypoint_args: - description: Entry-point arguments + description: Entry-point arguments (either a simple string or a JSON list) type: string required: false default: "" From 5b5a9f9fb2823337728759997fddd53853fdc4a5 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 17 Dec 2024 11:52:24 -0800 Subject: [PATCH 16/16] Remove trailing slash --- benchmarking/tpcds/ray_entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarking/tpcds/ray_entrypoint.py b/benchmarking/tpcds/ray_entrypoint.py index 34f2511fa7..20656b37b9 100644 --- a/benchmarking/tpcds/ray_entrypoint.py +++ b/benchmarking/tpcds/ray_entrypoint.py @@ -36,7 +36,7 @@ def register_catalog(scale_factor: int) -> SQLCatalog: return SQLCatalog( tables={ table: daft.read_parquet( - f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}.parquet/" + f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}.parquet" ) for table in TABLE_NAMES }