Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prediction API #5

Merged
merged 37 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
33cf4b3
first prediction functionality
cmelone Jan 24, 2024
24cfb69
1
cmelone Jan 30, 2024
8421602
add basic routing
cmelone Jan 30, 2024
e7bcc9f
check whether predictions are lower than current allocation
cmelone Jan 31, 2024
c2da0e0
adjust conversions
cmelone Jan 31, 2024
169fab5
fix some bugs
cmelone Jan 31, 2024
3464898
no black current_mapping
cmelone Jan 31, 2024
2bd4d98
readability
cmelone Feb 1, 2024
0813027
allocate -> allocation
cmelone Feb 6, 2024
63a1404
use GET endpoint rather than POST for allocation
cmelone Feb 6, 2024
960f89b
simplify validation logic
cmelone Feb 6, 2024
2f65298
bulk predictions run in parallel
cmelone Feb 6, 2024
5714aae
fmt changes
cmelone Feb 9, 2024
22d87e5
cpu units now in cores rather than millicores
cmelone Feb 9, 2024
27b07f5
request limit 8MB
cmelone Feb 9, 2024
3f2b7ff
add variant filtering
cmelone Feb 9, 2024
a87f0f1
small fixes
cmelone Feb 1, 2024
031706e
updated standard allocations
cmelone Feb 15, 2024
1125971
Re-write the variant filtering logic.
cmelone Feb 16, 2024
ce9bab3
formatting
cmelone Feb 16, 2024
00795b2
formatting 😵‍💫
cmelone Feb 16, 2024
9affc34
logging -> logger usage
cmelone Feb 16, 2024
777c175
remove unnecessary comment [ci skip]
cmelone Feb 16, 2024
7aeac68
move validate payload for prediction into util for easier testing
cmelone Feb 27, 2024
feda9c6
make a couple changes to prediction to make testing easier
cmelone Feb 27, 2024
29b99ad
remove bulk prediction functionality
cmelone Feb 27, 2024
d8b0598
fix: remove unnecessary condition in validate_payload
cmelone Feb 27, 2024
50aead8
fix: remove custom GET length, 8K is sufficient for one url-encoded j…
cmelone Mar 4, 2024
f5f2363
fix: remove hash from GET payload
cmelone Mar 4, 2024
341c5a3
fix: update docstring for allocation view
cmelone Mar 6, 2024
558d575
allocation API now accepts spec string rather than JSON payload
cmelone Mar 7, 2024
54658ce
formatting fixes
cmelone Mar 7, 2024
1d4278c
formatting fixes
cmelone Mar 7, 2024
b552cfc
black is not being consistent :(
cmelone Mar 7, 2024
67e0438
fix: race condition where spec passed by reference was modified by `g…
cmelone Mar 8, 2024
e3b2602
`predict_single` -> `predict`
cmelone Apr 25, 2024
00821d4
decouple prediction strategy from env variable
cmelone Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,3 @@ The following variables should be exposed to the container. Those **bolded** are
- **`GITLAB_API_TOKEN`** - this token should have API read access
- **`GITLAB_WEBHOOK_TOKEN`** - coordinate this value with the collection webhook
- **`DB_FILE`** - path where the application can access the SQLite file
- `MAX_GET_SIZE` - the maximum `GET` request (in bytes), default is 8MB
- `GANTRY_HOST` - web app hostname, default is `localhost`
- `GANTRY_PORT` - web app port, default is `8080`
- `PREDICT_STRATEGY` - optional mode for the prediction algorithm
- options:
- `ensure_higher`: if the predicted resource usage is below current levels, it will disregard the prediction and keep what would be allocated without Gantry's intervention
Empty file.
3 changes: 3 additions & 0 deletions gantry/routes/prediction/current_mapping.py

Large diffs are not rendered by default.

195 changes: 195 additions & 0 deletions gantry/routes/prediction/prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import logging

import aiosqlite

from gantry.routes.prediction.current_mapping import pkg_mappings
from gantry.util import k8s

logger = logging.getLogger(__name__)

IDEAL_SAMPLE = 5
DEFAULT_CPU_REQUEST = 1.0
DEFAULT_MEM_REQUEST = 2 * 1_000_000_000 # 2GB in bytes
EXPENSIVE_VARIANTS = {
"sycl",
"mpi",
"rocm",
"cuda",
"python",
"fortran",
"openmp",
"hdf5",
}
cmelone marked this conversation as resolved.
Show resolved Hide resolved


async def predict(db: aiosqlite.Connection, spec: dict, strategy: str = None) -> dict:
"""
Predict the resource usage of a spec

args:
spec: dict that contains pkg_name, pkg_version, pkg_variants,
compiler_name, compiler_version
strategy (optional): custom prediction behavior
"ensure_higher": if the predicted resource usage is
below current levels, it will disregard the prediction and
keep what would be allocated without Gantry's intervention
returns:
dict of predicted resource usage: cpu_request, mem_request
CPU in millicore, mem in MB
"""

sample = await get_sample(db, spec)
predictions = {}
if not sample:
predictions = {
"cpu_request": DEFAULT_CPU_REQUEST,
"mem_request": DEFAULT_MEM_REQUEST,
}
else:
# mapping of sample: [0] cpu_mean, [1] cpu_max, [2] mem_mean, [3] mem_max
predictions = {
# averages the respective metric in the sample
# cpu should always be whole number
"cpu_request": round(sum([build[0] for build in sample]) / len(sample)),
"mem_request": sum([build[2] for build in sample]) / len(sample),
}

if strategy == "ensure_higher":
ensure_higher_pred(predictions, spec["pkg_name"])

# warn if the prediction is below some thresholds
if predictions["cpu_request"] < 0.25:
logger.warning(f"Warning: CPU request for {spec} is below 0.25 cores")
predictions["cpu_request"] = DEFAULT_CPU_REQUEST
if predictions["mem_request"] < 10_000_000:
logger.warning(f"Warning: Memory request for {spec} is below 10MB")
predictions["mem_request"] = DEFAULT_MEM_REQUEST

# convert predictions to k8s friendly format
for k, v in predictions.items():
if k.startswith("cpu"):
predictions[k] = str(int(v))
elif k.startswith("mem"):
predictions[k] = k8s.convert_bytes(v)

return {
"variables": {
# spack uses these env vars to set the resource requests
# set them here at the last minute to avoid using these vars
# and clogging up the code
"KUBERNETES_CPU_REQUEST": predictions["cpu_request"],
"KUBERNETES_MEMORY_REQUEST": predictions["mem_request"],
},
}


async def get_sample(db: aiosqlite.Connection, spec: dict) -> list:
"""
Selects a sample of builds to use for prediction

args:
spec: see predict
returns:
list of lists with cpu_mean, cpu_max, mem_mean, mem_max
"""

# ranked in order of priority, the params we would like to match on
param_combos = (
(
"pkg_name",
"pkg_variants",
"pkg_version",
"compiler_name",
"compiler_version",
),
("pkg_name", "pkg_variants", "compiler_name", "compiler_version"),
("pkg_name", "pkg_variants", "pkg_version", "compiler_name"),
("pkg_name", "pkg_variants", "compiler_name"),
("pkg_name", "pkg_variants", "pkg_version"),
("pkg_name", "pkg_variants"),
)

async def select_sample(query: str, filters: dict, extra_params: list = []) -> list:
async with db.execute(query, list(filters.values()) + extra_params) as cursor:
sample = await cursor.fetchall()
# we can accept the sample if it's 1 shorter
if len(sample) >= IDEAL_SAMPLE - 1:
return sample
return []

for combo in param_combos:
filters = {param: spec[param] for param in combo}

# the first attempt at getting a sample is to match on all the params
# within this combo, variants included
query = f"""
SELECT cpu_mean, cpu_max, mem_mean, mem_max FROM jobs
WHERE ref='develop' AND {' AND '.join(f'{param}=?' for param in filters.keys())}
ORDER BY end DESC LIMIT {IDEAL_SAMPLE}
"""

if sample := await select_sample(query, filters):
return sample

# if we are not able to get a sufficient sample, we'll try to filter
# by expensive variants, rather than an exact variant match

filters.pop("pkg_variants")

exp_variant_conditions = []
exp_variant_values = []

# iterate through all the expensive variants and create a set of conditions
# for the select query
for var in EXPENSIVE_VARIANTS:
if var in spec["pkg_variants_dict"]:
# if the client has queried for an expensive variant, we want to ensure
# that the sample has the same exact value
exp_variant_conditions.append(
f"json_extract(pkg_variants, '$.{var}')=?"
)

exp_variant_values.append(int(spec["pkg_variants_dict"].get(var, 0)))
else:
# if an expensive variant was not queried for,
# we want to make sure that the variant was not set within the sample
# as we want to ensure that the sample is not biased towards
# the presence of expensive variants (or lack thereof)
exp_variant_conditions.append(
f"json_extract(pkg_variants, '$.{var}') IS NULL"
)

query = f"""
SELECT cpu_mean, cpu_max, mem_mean, mem_max FROM jobs
WHERE ref='develop' AND {' AND '.join(f'{param}=?' for param in filters.keys())}
AND {' AND '.join(exp_variant_conditions)}
ORDER BY end DESC LIMIT {IDEAL_SAMPLE}
"""

if sample := await select_sample(query, filters, exp_variant_values):
return sample

return []


def ensure_higher_pred(prediction: dict, pkg_name: str):
"""
Ensure that the prediction is higher than the current allocation
for the package. This will be removed in the future as we analyze
the effectiveness of the prediction model.

args:
prediction: dict of predicted resource usage: cpu_request, mem_request
pkg_name: str
"""

cur_alloc = pkg_mappings.get(pkg_name)

if cur_alloc:
prediction["cpu_request"] = max(
prediction["cpu_request"], cur_alloc["cpu_request"]
)

prediction["mem_request"] = max(
prediction["mem_request"], cur_alloc["mem_request"]
)
8 changes: 8 additions & 0 deletions gantry/util/k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BYTES_TO_MEGABYTES = 1 / 1_000_000

# these functions convert the predictions to k8s friendly format


def convert_bytes(bytes: float) -> str:
"""bytes to megabytes"""
return str(int(bytes * BYTES_TO_MEGABYTES)) + "M"
58 changes: 58 additions & 0 deletions gantry/util/spec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
import re


def spec_variants(spec: str) -> dict:
"""Given a spec's concrete variants, return a dict in name: value format."""
# example: +adios2~advanced_debug patches=02253c7,acb3805,b724e6a use_vtkm=on
Expand Down Expand Up @@ -27,3 +31,57 @@ def spec_variants(spec: str) -> dict:
variants[part[1:]] = False

return variants


def parse_alloc_spec(spec: str) -> dict:
"""
Parses a spec in the format [email protected] +json+native+treesitter%[email protected]
and returns a dictionary with the following keys:
- pkg_name: str
- pkg_version: str
- pkg_variants: str
- pkg_variants_dict: dict
- compiler: str
- compiler_version: str

Returns an empty dict if the spec is invalid.

This format is specifically used for the allocation API and is documented
for the client.
"""

# example: [email protected] +json+native+treesitter%[email protected]
spec_pattern = re.compile(r"(.+?)@([\d.]+)\s+(.+?)%([\w-]+)@([\d.]+)")

match = spec_pattern.match(spec)
if not match:
return {}

# groups in order
# create a dictionary with the keys and values
(
pkg_name,
pkg_version,
pkg_variants,
compiler_name,
compiler_version,
) = match.groups()

pkg_variants_dict = spec_variants(pkg_variants)
if not pkg_variants_dict:
return {}

spec_dict = {
"pkg_name": pkg_name,
"pkg_version": pkg_version,
# two representations of the variants are returned here
# to cut down on repeated conversions in later functions
# variants are represented as JSON in the database
"pkg_variants": json.dumps(pkg_variants_dict),
# variants dict is also returned for the client
"pkg_variants_dict": pkg_variants_dict,
"compiler_name": compiler_name,
"compiler_version": compiler_version,
}

return spec_dict
37 changes: 37 additions & 0 deletions gantry/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from aiohttp import web

from gantry.routes.collection import fetch_job
from gantry.routes.prediction.prediction import predict
from gantry.util.spec import parse_alloc_spec

logger = logging.getLogger(__name__)
routes = web.RouteTableDef()
Expand Down Expand Up @@ -35,3 +37,38 @@ async def collect_job(request: web.Request) -> web.Response:
)

return web.Response(status=200)


@routes.get("/v1/allocation")
async def allocation(request: web.Request) -> web.Response:
"""
Given a spec return environment variables
that set resource allocations based on historical data.

acceptable spec format:
pkg_name@pkg_version +variant1+variant2%compiler@compiler_version
NOTE: there must be a space between the package version and the variants

returns:

{
"variables": {}
}

the variables key contains the environment variables
that should be set within the build environment
example: KUBERNETES_CPU_REQUEST, KUBERNETES_CPU_LIMIT, etc.
"""
spec = request.query.get("spec")

if not spec:
return web.Response(status=400, text="missing spec parameter")

parsed_spec = parse_alloc_spec(spec)
if not parsed_spec:
return web.Response(status=400, text="invalid spec")

# we want to keep predictions >= current levels (with ensure_higher strategy)
return web.json_response(
await predict(request.app["db"], parsed_spec, strategy="ensure_higher")
)