Skip to content

Commit

Permalink
ci: Improve visualization of tpcds + tpch benchmarking outputs (#3654)
Browse files Browse the repository at this point in the history
# Overview

This PR adds the ability for the `run-cluster` GitHub Actions workflow
to output:
1. a file containing the entire printed out plan for one given question
2. additional stats for the planning + execution time for one given
question

## Implementation

These files are stored inside of `/tmp/ray`, which is then downloaded
via the `run-cluster` workflow, which is then uploaded to the GitHub
Actions Summary Page.

## Usage

You can run this locally or through CI.

### Locally

```sh
DAFT_RUNNER=ray python benchmarking/tpcds/ray_entrypoint.py --question=1 --scale-factor=1
```

### Remotely

```sh
uv run tools/tpcds.py --questions='1,2' --scale-factor=1
```
  • Loading branch information
raunakab authored Jan 9, 2025
1 parent 3479b1c commit 43bbbeb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
35 changes: 31 additions & 4 deletions benchmarking/tpcds/ray_entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import json
from datetime import datetime
from pathlib import Path

import daft
Expand Down Expand Up @@ -36,7 +38,7 @@ def register_catalog(scale_factor: int) -> SQLCatalog:
return SQLCatalog(
tables={
table: daft.read_parquet(
f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}.parquet"
f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}"
)
for table in TABLE_NAMES
}
Expand All @@ -51,11 +53,36 @@ def run(
catalog = register_catalog(scale_factor)
query_file = Path(__file__).parent / "queries" / f"{question:02}.sql"
with open(query_file) as f:
query = f.read()
query_string = f.read()

daft.sql(query, catalog=catalog).explain(show_all=True)
info_path = Path("/tmp") / "ray" / "session_latest" / "logs" / "info"
info_path.mkdir(parents=True, exist_ok=True)
query = daft.sql(query_string, catalog=catalog)

explain_delta = None
with open(info_path / f"plan-{question}.txt", "w") as f:
explain_start = datetime.now()
query.explain(show_all=True, file=f, format="mermaid")
explain_end = datetime.now()
explain_delta = explain_end - explain_start

execute_delta = None
if not dry_run:
daft.sql(query, catalog=catalog).collect()
execute_start = datetime.now()
query.collect()
execute_end = datetime.now()
execute_delta = execute_end - execute_start

with open(info_path / f"stats-{question}.txt", "w") as f:
stats = json.dumps(
{
"question": question,
"scale-factor": scale_factor,
"planning-time": explain_delta,
"execution-time": execute_delta,
}
)
f.write(stats)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion tools/tpcds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def run(
branch_name: str,
branch_name: Optional[str],
questions: Optional[str],
scale_factor: int,
cluster_profile: str,
Expand Down Expand Up @@ -49,6 +49,7 @@ def run(
parser.add_argument(
"--scale-factor",
choices=[
1,
2,
5,
10,
Expand Down
2 changes: 1 addition & 1 deletion tools/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


def run(
branch_name: str,
branch_name: Optional[str],
questions: Optional[str],
scale_factor: int,
num_partitions: int,
Expand Down

0 comments on commit 43bbbeb

Please sign in to comment.