diff --git a/src/diracx/core/models.py b/src/diracx/core/models.py index 930b3d5f..21fc53a9 100644 --- a/src/diracx/core/models.py +++ b/src/diracx/core/models.py @@ -116,3 +116,14 @@ class UserInfo(BaseModel): class ChecksumAlgorithm(StrEnum): SHA256 = "sha256" + + +class SandboxFormat(StrEnum): + TAR_BZ2 = "tar.bz2" + + +class SandboxInfo(BaseModel): + checksum_algorithm: ChecksumAlgorithm + checksum: str = Field(pattern=r"^[0-f]{64}$") + size: int = Field(ge=1) + format: SandboxFormat diff --git a/src/diracx/db/sql/sandbox_metadata/db.py b/src/diracx/db/sql/sandbox_metadata/db.py index 6900f58a..a95dd93f 100644 --- a/src/diracx/db/sql/sandbox_metadata/db.py +++ b/src/diracx/db/sql/sandbox_metadata/db.py @@ -1,80 +1,88 @@ -""" SandboxMetadataDB frontend -""" - from __future__ import annotations -import datetime - import sqlalchemy -from diracx.db.sql.utils import BaseSQLDB +from diracx.core.models import SandboxInfo, UserInfo +from diracx.db.sql.utils import BaseSQLDB, utcnow from .schema import Base as SandboxMetadataDBBase from .schema import sb_Owners, sb_SandBoxes +# In legacy DIRAC the SEName column was used to support multiple different +# storage backends. This is no longer the case, so we hardcode the value to +# S3 to represent the new DiracX system. +SE_NAME = "ProductionSandboxSE" +PFN_PREFIX = "/S3/" + class SandboxMetadataDB(BaseSQLDB): metadata = SandboxMetadataDBBase.metadata - async def _get_put_owner(self, owner: str, owner_group: str) -> int: - """adds a new owner/ownerGroup pairs, while returning their ID if already existing - - Args: - owner (str): user name - owner_group (str): group of the owner - """ + async def upsert_owner(self, user: UserInfo) -> int: + """Get the id of the owner from the database""" + # TODO: Follow https://github.com/DIRACGrid/diracx/issues/49 stmt = sqlalchemy.select(sb_Owners.OwnerID).where( - sb_Owners.Owner == owner, sb_Owners.OwnerGroup == owner_group + sb_Owners.Owner == user.preferred_username, + sb_Owners.OwnerGroup == user.dirac_group, + # TODO: Add VO ) result = await self.conn.execute(stmt) if owner_id := result.scalar_one_or_none(): return owner_id - stmt = sqlalchemy.insert(sb_Owners).values(Owner=owner, OwnerGroup=owner_group) + stmt = sqlalchemy.insert(sb_Owners).values( + Owner=user.preferred_username, + OwnerGroup=user.dirac_group, + ) result = await self.conn.execute(stmt) return result.lastrowid - async def insert( - self, owner: str, owner_group: str, sb_SE: str, se_PFN: str, size: int = 0 - ) -> tuple[int, bool]: - """inserts a new sandbox in SandboxMetadataDB - this is "equivalent" of DIRAC registerAndGetSandbox + @staticmethod + def get_pfn(bucket_name: str, user: UserInfo, sandbox_info: SandboxInfo) -> str: + """Get the sandbox's user namespaced and content addressed PFN""" + parts = [ + "S3", + bucket_name, + user.vo, + user.dirac_group, + user.preferred_username, + f"{sandbox_info.checksum_algorithm}:{sandbox_info.checksum}.{sandbox_info.format}", + ] + return "/" + "/".join(parts) - Args: - owner (str): user name_ - owner_group (str): groupd of the owner - sb_SE (str): _description_ - sb_PFN (str): _description_ - size (int, optional): _description_. Defaults to 0. - """ - owner_id = await self._get_put_owner(owner, owner_group) + async def insert_sandbox(self, user: UserInfo, pfn: str, size: int) -> None: + """Add a new sandbox in SandboxMetadataDB""" + # TODO: Follow https://github.com/DIRACGrid/diracx/issues/49 + owner_id = await self.upsert_owner(user) stmt = sqlalchemy.insert(sb_SandBoxes).values( - OwnerId=owner_id, SEName=sb_SE, SEPFN=se_PFN, Bytes=size + OwnerId=owner_id, + SEName=SE_NAME, + SEPFN=pfn, + Bytes=size, + RegistrationTime=utcnow(), + LastAccessTime=utcnow(), ) try: result = await self.conn.execute(stmt) - return result.lastrowid except sqlalchemy.exc.IntegrityError: - # it is a duplicate, try to retrieve SBiD - stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.SBId).where( # type: ignore[no-redef] - sb_SandBoxes.SEPFN == se_PFN, - sb_SandBoxes.SEName == sb_SE, - sb_SandBoxes.OwnerId == owner_id, - ) - result = await self.conn.execute(stmt) - sb_ID = result.scalar_one() - stmt: sqlalchemy.Executable = ( # type: ignore[no-redef] - sqlalchemy.update(sb_SandBoxes) - .where(sb_SandBoxes.SBId == sb_ID) - .values(LastAccessTime=datetime.datetime.utcnow()) - ) - await self.conn.execute(stmt) - return sb_ID + await self.update_sandbox_last_access_time(pfn) + else: + assert result.rowcount == 1 - async def delete(self, sandbox_ids: list[int]) -> bool: - stmt: sqlalchemy.Executable = sqlalchemy.delete(sb_SandBoxes).where( - sb_SandBoxes.SBId.in_(sandbox_ids) + async def update_sandbox_last_access_time(self, pfn: str) -> None: + stmt = ( + sqlalchemy.update(sb_SandBoxes) + .where(sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn) + .values(LastAccessTime=utcnow()) ) - await self.conn.execute(stmt) + result = await self.conn.execute(stmt) + assert result.rowcount == 1 - return True + async def sandbox_is_assigned(self, pfn: str) -> bool: + """Checks if a sandbox exists and has been assigned.""" + stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.Assigned).where( + sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn + ) + result = await self.conn.execute(stmt) + is_assigned = result.scalar_one() + return is_assigned diff --git a/tests/db/test_sandboxMetadataDB.py b/tests/db/test_sandboxMetadataDB.py deleted file mode 100644 index ffcf91ec..00000000 --- a/tests/db/test_sandboxMetadataDB.py +++ /dev/null @@ -1,84 +0,0 @@ -from __future__ import annotations - -import pytest -import sqlalchemy - -from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB - - -@pytest.fixture -async def sandbox_metadata_db(tmp_path): - sandbox_metadata_db = SandboxMetadataDB("sqlite+aiosqlite:///:memory:") - async with sandbox_metadata_db.engine_context(): - async with sandbox_metadata_db.engine.begin() as conn: - await conn.run_sync(sandbox_metadata_db.metadata.create_all) - yield sandbox_metadata_db - - -async def test__get_put_owner(sandbox_metadata_db): - async with sandbox_metadata_db as sandbox_metadata_db: - result = await sandbox_metadata_db._get_put_owner("owner", "owner_group") - assert result == 1 - result = await sandbox_metadata_db._get_put_owner("owner_2", "owner_group") - assert result == 2 - result = await sandbox_metadata_db._get_put_owner("owner", "owner_group") - assert result == 1 - result = await sandbox_metadata_db._get_put_owner("owner_2", "owner_group") - assert result == 2 - result = await sandbox_metadata_db._get_put_owner("owner_2", "owner_group_2") - assert result == 3 - - -async def test_insert(sandbox_metadata_db): - async with sandbox_metadata_db as sandbox_metadata_db: - result = await sandbox_metadata_db.insert( - "owner", - "owner_group", - "sbSE", - "sbPFN", - 123, - ) - assert result == 1 - - result = await sandbox_metadata_db.insert( - "owner", - "owner_group", - "sbSE", - "sbPFN", - 123, - ) - assert result == 1 - - result = await sandbox_metadata_db.insert( - "owner_2", - "owner_group", - "sbSE", - "sbPFN_2", - 123, - ) - assert result == 2 - - # This would be incorrect - with pytest.raises(sqlalchemy.exc.NoResultFound): - await sandbox_metadata_db.insert( - "owner", - "owner_group", - "sbSE", - "sbPFN_2", - 123, - ) - - -async def test_delete(sandbox_metadata_db): - async with sandbox_metadata_db as sandbox_metadata_db: - result = await sandbox_metadata_db.insert( - "owner", - "owner_group", - "sbSE", - "sbPFN", - 123, - ) - assert result == 1 - - result = await sandbox_metadata_db.delete([1]) - assert result diff --git a/tests/db/test_sandbox_metadata.py b/tests/db/test_sandbox_metadata.py new file mode 100644 index 00000000..6757af13 --- /dev/null +++ b/tests/db/test_sandbox_metadata.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import asyncio +import secrets +from datetime import datetime + +import pytest +import sqlalchemy + +from diracx.core.models import SandboxInfo, UserInfo +from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB +from diracx.db.sql.sandbox_metadata.schema import sb_SandBoxes + + +@pytest.fixture +async def sandbox_metadata_db(tmp_path): + sandbox_metadata_db = SandboxMetadataDB("sqlite+aiosqlite:///:memory:") + async with sandbox_metadata_db.engine_context(): + async with sandbox_metadata_db.engine.begin() as conn: + await conn.run_sync(sandbox_metadata_db.metadata.create_all) + yield sandbox_metadata_db + + +def test_get_pfn(sandbox_metadata_db: SandboxMetadataDB): + user_info = UserInfo( + sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo" + ) + sandbox_info = SandboxInfo( + checksum="checksum", + checksum_algorithm="sha256", + format="tar.bz2", + size=100, + ) + pfn = sandbox_metadata_db.get_pfn("bucket1", user_info, sandbox_info) + assert pfn == "/S3/bucket1/vo/group1/user1/sha256:checksum.tar.bz2" + + +async def test_insert_sandbox(sandbox_metadata_db: SandboxMetadataDB): + user_info = UserInfo( + sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo" + ) + pfn1 = secrets.token_hex() + + # Make sure the sandbox doesn't already exist + db_contents = await _dump_db(sandbox_metadata_db) + assert pfn1 not in db_contents + async with sandbox_metadata_db: + with pytest.raises(sqlalchemy.exc.NoResultFound): + await sandbox_metadata_db.sandbox_is_assigned(pfn1) + + # Insert the sandbox + async with sandbox_metadata_db: + await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100) + db_contents = await _dump_db(sandbox_metadata_db) + owner_id1, last_access_time1 = db_contents[pfn1] + + # Inserting again should update the last access time + await asyncio.sleep(1) # The timestamp only has second precision + async with sandbox_metadata_db: + await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100) + db_contents = await _dump_db(sandbox_metadata_db) + owner_id2, last_access_time2 = db_contents[pfn1] + assert owner_id1 == owner_id2 + assert last_access_time2 > last_access_time1 + + # The sandbox still hasn't been assigned + async with sandbox_metadata_db: + assert not await sandbox_metadata_db.sandbox_is_assigned(pfn1) + + # Inserting again should update the last access time + await asyncio.sleep(1) # The timestamp only has second precision + last_access_time3 = (await _dump_db(sandbox_metadata_db))[pfn1][1] + assert last_access_time2 == last_access_time3 + async with sandbox_metadata_db: + await sandbox_metadata_db.update_sandbox_last_access_time(pfn1) + last_access_time4 = (await _dump_db(sandbox_metadata_db))[pfn1][1] + assert last_access_time2 < last_access_time4 + + +async def _dump_db( + sandbox_metadata_db: SandboxMetadataDB, +) -> dict[str, tuple[int, datetime]]: + """Dump the contents of the sandbox metadata database + + Returns a dict[pfn: str, (owner_id: int, last_access_time: datetime)] + """ + async with sandbox_metadata_db: + stmt = sqlalchemy.select( + sb_SandBoxes.SEPFN, sb_SandBoxes.OwnerId, sb_SandBoxes.LastAccessTime + ) + res = await sandbox_metadata_db.conn.execute(stmt) + return {row.SEPFN: (row.OwnerId, row.LastAccessTime) for row in res}