Skip to content

Commit

Permalink
Update SandboxMetadataDB interface
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisburr committed Oct 3, 2023
1 parent f098104 commit 3d69d65
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 134 deletions.
11 changes: 11 additions & 0 deletions src/diracx/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,14 @@ class UserInfo(BaseModel):

class ChecksumAlgorithm(StrEnum):
SHA256 = "sha256"


class SandboxFormat(StrEnum):
TAR_BZ2 = "tar.bz2"


class SandboxInfo(BaseModel):
checksum_algorithm: ChecksumAlgorithm
checksum: str = Field(pattern=r"^[0-f]{64}$")
size: int = Field(ge=1)
format: SandboxFormat
108 changes: 58 additions & 50 deletions src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,88 @@
""" SandboxMetadataDB frontend
"""

from __future__ import annotations

import datetime

import sqlalchemy

from diracx.db.sql.utils import BaseSQLDB
from diracx.core.models import SandboxInfo, UserInfo
from diracx.db.sql.utils import BaseSQLDB, utcnow

from .schema import Base as SandboxMetadataDBBase
from .schema import sb_Owners, sb_SandBoxes

# In legacy DIRAC the SEName column was used to support multiple different
# storage backends. This is no longer the case, so we hardcode the value to
# S3 to represent the new DiracX system.
SE_NAME = "ProductionSandboxSE"
PFN_PREFIX = "/S3/"


class SandboxMetadataDB(BaseSQLDB):
metadata = SandboxMetadataDBBase.metadata

async def _get_put_owner(self, owner: str, owner_group: str) -> int:
"""adds a new owner/ownerGroup pairs, while returning their ID if already existing
Args:
owner (str): user name
owner_group (str): group of the owner
"""
async def upsert_owner(self, user: UserInfo) -> int:
"""Get the id of the owner from the database"""
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
stmt = sqlalchemy.select(sb_Owners.OwnerID).where(
sb_Owners.Owner == owner, sb_Owners.OwnerGroup == owner_group
sb_Owners.Owner == user.preferred_username,
sb_Owners.OwnerGroup == user.dirac_group,
# TODO: Add VO
)
result = await self.conn.execute(stmt)
if owner_id := result.scalar_one_or_none():
return owner_id

stmt = sqlalchemy.insert(sb_Owners).values(Owner=owner, OwnerGroup=owner_group)
stmt = sqlalchemy.insert(sb_Owners).values(
Owner=user.preferred_username,
OwnerGroup=user.dirac_group,
)
result = await self.conn.execute(stmt)
return result.lastrowid

async def insert(
self, owner: str, owner_group: str, sb_SE: str, se_PFN: str, size: int = 0
) -> tuple[int, bool]:
"""inserts a new sandbox in SandboxMetadataDB
this is "equivalent" of DIRAC registerAndGetSandbox
@staticmethod
def get_pfn(bucket_name: str, user: UserInfo, sandbox_info: SandboxInfo) -> str:
"""Get the sandbox's user namespaced and content addressed PFN"""
parts = [
"S3",
bucket_name,
user.vo,
user.dirac_group,
user.preferred_username,
f"{sandbox_info.checksum_algorithm}:{sandbox_info.checksum}.{sandbox_info.format}",
]
return "/" + "/".join(parts)

Args:
owner (str): user name_
owner_group (str): groupd of the owner
sb_SE (str): _description_
sb_PFN (str): _description_
size (int, optional): _description_. Defaults to 0.
"""
owner_id = await self._get_put_owner(owner, owner_group)
async def insert_sandbox(self, user: UserInfo, pfn: str, size: int) -> None:
"""Add a new sandbox in SandboxMetadataDB"""
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
owner_id = await self.upsert_owner(user)
stmt = sqlalchemy.insert(sb_SandBoxes).values(
OwnerId=owner_id, SEName=sb_SE, SEPFN=se_PFN, Bytes=size
OwnerId=owner_id,
SEName=SE_NAME,
SEPFN=pfn,
Bytes=size,
RegistrationTime=utcnow(),
LastAccessTime=utcnow(),
)
try:
result = await self.conn.execute(stmt)
return result.lastrowid
except sqlalchemy.exc.IntegrityError:
# it is a duplicate, try to retrieve SBiD
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.SBId).where( # type: ignore[no-redef]
sb_SandBoxes.SEPFN == se_PFN,
sb_SandBoxes.SEName == sb_SE,
sb_SandBoxes.OwnerId == owner_id,
)
result = await self.conn.execute(stmt)
sb_ID = result.scalar_one()
stmt: sqlalchemy.Executable = ( # type: ignore[no-redef]
sqlalchemy.update(sb_SandBoxes)
.where(sb_SandBoxes.SBId == sb_ID)
.values(LastAccessTime=datetime.datetime.utcnow())
)
await self.conn.execute(stmt)
return sb_ID
await self.update_sandbox_last_access_time(pfn)
else:
assert result.rowcount == 1

async def delete(self, sandbox_ids: list[int]) -> bool:
stmt: sqlalchemy.Executable = sqlalchemy.delete(sb_SandBoxes).where(
sb_SandBoxes.SBId.in_(sandbox_ids)
async def update_sandbox_last_access_time(self, pfn: str) -> None:
stmt = (
sqlalchemy.update(sb_SandBoxes)
.where(sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn)
.values(LastAccessTime=utcnow())
)
await self.conn.execute(stmt)
result = await self.conn.execute(stmt)
assert result.rowcount == 1

return True
async def sandbox_is_assigned(self, pfn: str) -> bool:
"""Checks if a sandbox exists and has been assigned."""
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.Assigned).where(
sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn
)
result = await self.conn.execute(stmt)
is_assigned = result.scalar_one()
return is_assigned
84 changes: 0 additions & 84 deletions tests/db/test_sandboxMetadataDB.py

This file was deleted.

92 changes: 92 additions & 0 deletions tests/db/test_sandbox_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import annotations

import asyncio
import secrets
from datetime import datetime

import pytest
import sqlalchemy

from diracx.core.models import SandboxInfo, UserInfo
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
from diracx.db.sql.sandbox_metadata.schema import sb_SandBoxes


@pytest.fixture
async def sandbox_metadata_db(tmp_path):
sandbox_metadata_db = SandboxMetadataDB("sqlite+aiosqlite:///:memory:")
async with sandbox_metadata_db.engine_context():
async with sandbox_metadata_db.engine.begin() as conn:
await conn.run_sync(sandbox_metadata_db.metadata.create_all)
yield sandbox_metadata_db


def test_get_pfn(sandbox_metadata_db: SandboxMetadataDB):
user_info = UserInfo(
sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo"
)
sandbox_info = SandboxInfo(
checksum="checksum",
checksum_algorithm="sha256",
format="tar.bz2",
size=100,
)
pfn = sandbox_metadata_db.get_pfn("bucket1", user_info, sandbox_info)
assert pfn == "/S3/bucket1/vo/group1/user1/sha256:checksum.tar.bz2"


async def test_insert_sandbox(sandbox_metadata_db: SandboxMetadataDB):
user_info = UserInfo(
sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo"
)
pfn1 = secrets.token_hex()

# Make sure the sandbox doesn't already exist
db_contents = await _dump_db(sandbox_metadata_db)
assert pfn1 not in db_contents
async with sandbox_metadata_db:
with pytest.raises(sqlalchemy.exc.NoResultFound):
await sandbox_metadata_db.sandbox_is_assigned(pfn1)

# Insert the sandbox
async with sandbox_metadata_db:
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100)
db_contents = await _dump_db(sandbox_metadata_db)
owner_id1, last_access_time1 = db_contents[pfn1]

# Inserting again should update the last access time
await asyncio.sleep(1) # The timestamp only has second precision
async with sandbox_metadata_db:
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100)
db_contents = await _dump_db(sandbox_metadata_db)
owner_id2, last_access_time2 = db_contents[pfn1]
assert owner_id1 == owner_id2
assert last_access_time2 > last_access_time1

# The sandbox still hasn't been assigned
async with sandbox_metadata_db:
assert not await sandbox_metadata_db.sandbox_is_assigned(pfn1)

# Inserting again should update the last access time
await asyncio.sleep(1) # The timestamp only has second precision
last_access_time3 = (await _dump_db(sandbox_metadata_db))[pfn1][1]
assert last_access_time2 == last_access_time3
async with sandbox_metadata_db:
await sandbox_metadata_db.update_sandbox_last_access_time(pfn1)
last_access_time4 = (await _dump_db(sandbox_metadata_db))[pfn1][1]
assert last_access_time2 < last_access_time4


async def _dump_db(
sandbox_metadata_db: SandboxMetadataDB,
) -> dict[str, tuple[int, datetime]]:
"""Dump the contents of the sandbox metadata database
Returns a dict[pfn: str, (owner_id: int, last_access_time: datetime)]
"""
async with sandbox_metadata_db:
stmt = sqlalchemy.select(
sb_SandBoxes.SEPFN, sb_SandBoxes.OwnerId, sb_SandBoxes.LastAccessTime
)
res = await sandbox_metadata_db.conn.execute(stmt)
return {row.SEPFN: (row.OwnerId, row.LastAccessTime) for row in res}

0 comments on commit 3d69d65

Please sign in to comment.