Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

fix: automatic round-down to quantum size #554

Merged
merged 17 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/554.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Recalculate `kernels.occupied_slots` column with actually allocated resource slots value.
2 changes: 0 additions & 2 deletions src/ai/backend/manager/models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,12 @@ async def from_image_ref(
with respect to requested canonical, this function will
return that row regardless of actual architecture.
"""
log.debug('from_image_ref(): {} ({})', ref.canonical, ref.architecture)
query = sa.select(ImageRow).where(ImageRow.name == ref.canonical)
if load_aliases:
query = query.options(selectinload(ImageRow.aliases))

result = await session.execute(query)
candidates: List[ImageRow] = result.scalars().all()
log.debug('rows: {}', candidates)

if len(candidates) == 0:
raise UnknownImageReference
Expand Down
67 changes: 55 additions & 12 deletions src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,14 +1254,13 @@ async def _post_create_kernel(

async def _finialize_running() -> None:
# Record kernel access information
async with self.db.begin() as conn:
agent_host = URL(agent_alloc_ctx.agent_addr).host
kernel_host = created_info.get('kernel_host', agent_host)
service_ports = created_info.get('service_ports', [])
# NOTE: created_info contains resource_spec
query = (
kernels.update()
.values({
try:
async with self.db.begin() as conn:
agent_host = URL(agent_alloc_ctx.agent_addr).host
kernel_host = created_info.get('kernel_host', agent_host)
service_ports = created_info.get('service_ports', [])
# NOTE: created_info contains resource_spec
values = {
'scaling_group': agent_alloc_ctx.scaling_group,
'status': KernelStatus.RUNNING,
'container_id': created_info['container_id'],
Expand All @@ -1273,10 +1272,54 @@ async def _finialize_running() -> None:
'stdin_port': created_info['stdin_port'],
'stdout_port': created_info['stdout_port'],
'service_ports': service_ports,
})
.where(kernels.c.id == created_info['id']))
await conn.execute(query)

}
actual_allocs: MutableMapping[str, str] = {}
for alloc_map in created_info['resource_spec']['allocations'].values():
for slot_name, allocations in alloc_map.items():
total_allocs: List[Decimal] = []
for allocation in allocations.values():
if BinarySize.suffix_map.get(allocation[-1].lower()) is not None:
total_allocs.append(Decimal(BinarySize.from_str(allocation)))
else:
total_allocs.append(Decimal(allocation))
actual_allocs[slot_name] = str(sum(total_allocs))
kyujin-cho marked this conversation as resolved.
Show resolved Hide resolved

values['occupied_slots'] = actual_allocs
query = (
kyujin-cho marked this conversation as resolved.
Show resolved Hide resolved
sa.select([kernels.c.occupied_slots])
.select_from(kernels)
.where(kernels.c.id == created_info['id'])
)
requested_slots = await conn.scalar(query)
query = (
sa.select([agents.c.occupied_slots])
.select_from(agents)
.where(agents.c.id == agent_alloc_ctx.agent_id)
)
agent_occupied_slots = await conn.scalar(query)
alloc_diffs = {
k: Decimal(requested_slots[k]) - Decimal(v)
for k, v in actual_allocs.items()
}
if any([v != 0 for v in alloc_diffs.values()]):
agent_actual_allocated_slots = {
k: str(Decimal(agent_occupied_slots[k]) - v)
for k, v in alloc_diffs.items()
}
query = (
kyujin-cho marked this conversation as resolved.
Show resolved Hide resolved
agents.update()
.values({'occupied_slots': agent_actual_allocated_slots})
.where(agents.c.id == agent_alloc_ctx.agent_id)
)
await conn.execute(query)
kyujin-cho marked this conversation as resolved.
Show resolved Hide resolved
query = (
kernels.update()
.values(values)
.where(kernels.c.id == created_info['id']))
await conn.execute(query)
except Exception as e:
log.exception('error while executing _finalize_running', exc_info=e)
kyujin-cho marked this conversation as resolved.
Show resolved Hide resolved
raise e
await execute_with_retry(_finialize_running)
finally:
try:
Expand Down