-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
161 additions
and
134 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,80 +1,88 @@ | ||
""" SandboxMetadataDB frontend | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import datetime | ||
|
||
import sqlalchemy | ||
|
||
from diracx.db.sql.utils import BaseSQLDB | ||
from diracx.core.models import SandboxInfo, UserInfo | ||
from diracx.db.sql.utils import BaseSQLDB, utcnow | ||
|
||
from .schema import Base as SandboxMetadataDBBase | ||
from .schema import sb_Owners, sb_SandBoxes | ||
|
||
# In legacy DIRAC the SEName column was used to support multiple different | ||
# storage backends. This is no longer the case, so we hardcode the value to | ||
# S3 to represent the new DiracX system. | ||
SE_NAME = "ProductionSandboxSE" | ||
PFN_PREFIX = "/S3/" | ||
|
||
|
||
class SandboxMetadataDB(BaseSQLDB): | ||
metadata = SandboxMetadataDBBase.metadata | ||
|
||
async def _get_put_owner(self, owner: str, owner_group: str) -> int: | ||
"""adds a new owner/ownerGroup pairs, while returning their ID if already existing | ||
Args: | ||
owner (str): user name | ||
owner_group (str): group of the owner | ||
""" | ||
async def upsert_owner(self, user: UserInfo) -> int: | ||
"""Get the id of the owner from the database""" | ||
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49 | ||
stmt = sqlalchemy.select(sb_Owners.OwnerID).where( | ||
sb_Owners.Owner == owner, sb_Owners.OwnerGroup == owner_group | ||
sb_Owners.Owner == user.preferred_username, | ||
sb_Owners.OwnerGroup == user.dirac_group, | ||
# TODO: Add VO | ||
) | ||
result = await self.conn.execute(stmt) | ||
if owner_id := result.scalar_one_or_none(): | ||
return owner_id | ||
|
||
stmt = sqlalchemy.insert(sb_Owners).values(Owner=owner, OwnerGroup=owner_group) | ||
stmt = sqlalchemy.insert(sb_Owners).values( | ||
Owner=user.preferred_username, | ||
OwnerGroup=user.dirac_group, | ||
) | ||
result = await self.conn.execute(stmt) | ||
return result.lastrowid | ||
|
||
async def insert( | ||
self, owner: str, owner_group: str, sb_SE: str, se_PFN: str, size: int = 0 | ||
) -> tuple[int, bool]: | ||
"""inserts a new sandbox in SandboxMetadataDB | ||
this is "equivalent" of DIRAC registerAndGetSandbox | ||
@staticmethod | ||
def get_pfn(bucket_name: str, user: UserInfo, sandbox_info: SandboxInfo) -> str: | ||
"""Get the sandbox's user namespaced and content addressed PFN""" | ||
parts = [ | ||
"S3", | ||
bucket_name, | ||
user.vo, | ||
user.dirac_group, | ||
user.preferred_username, | ||
f"{sandbox_info.checksum_algorithm}:{sandbox_info.checksum}.{sandbox_info.format}", | ||
] | ||
return "/" + "/".join(parts) | ||
|
||
Args: | ||
owner (str): user name_ | ||
owner_group (str): groupd of the owner | ||
sb_SE (str): _description_ | ||
sb_PFN (str): _description_ | ||
size (int, optional): _description_. Defaults to 0. | ||
""" | ||
owner_id = await self._get_put_owner(owner, owner_group) | ||
async def insert_sandbox(self, user: UserInfo, pfn: str, size: int) -> None: | ||
"""Add a new sandbox in SandboxMetadataDB""" | ||
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49 | ||
owner_id = await self.upsert_owner(user) | ||
stmt = sqlalchemy.insert(sb_SandBoxes).values( | ||
OwnerId=owner_id, SEName=sb_SE, SEPFN=se_PFN, Bytes=size | ||
OwnerId=owner_id, | ||
SEName=SE_NAME, | ||
SEPFN=pfn, | ||
Bytes=size, | ||
RegistrationTime=utcnow(), | ||
LastAccessTime=utcnow(), | ||
) | ||
try: | ||
result = await self.conn.execute(stmt) | ||
return result.lastrowid | ||
except sqlalchemy.exc.IntegrityError: | ||
# it is a duplicate, try to retrieve SBiD | ||
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.SBId).where( # type: ignore[no-redef] | ||
sb_SandBoxes.SEPFN == se_PFN, | ||
sb_SandBoxes.SEName == sb_SE, | ||
sb_SandBoxes.OwnerId == owner_id, | ||
) | ||
result = await self.conn.execute(stmt) | ||
sb_ID = result.scalar_one() | ||
stmt: sqlalchemy.Executable = ( # type: ignore[no-redef] | ||
sqlalchemy.update(sb_SandBoxes) | ||
.where(sb_SandBoxes.SBId == sb_ID) | ||
.values(LastAccessTime=datetime.datetime.utcnow()) | ||
) | ||
await self.conn.execute(stmt) | ||
return sb_ID | ||
await self.update_sandbox_last_access_time(pfn) | ||
else: | ||
assert result.rowcount == 1 | ||
|
||
async def delete(self, sandbox_ids: list[int]) -> bool: | ||
stmt: sqlalchemy.Executable = sqlalchemy.delete(sb_SandBoxes).where( | ||
sb_SandBoxes.SBId.in_(sandbox_ids) | ||
async def update_sandbox_last_access_time(self, pfn: str) -> None: | ||
stmt = ( | ||
sqlalchemy.update(sb_SandBoxes) | ||
.where(sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn) | ||
.values(LastAccessTime=utcnow()) | ||
) | ||
await self.conn.execute(stmt) | ||
result = await self.conn.execute(stmt) | ||
assert result.rowcount == 1 | ||
|
||
return True | ||
async def sandbox_is_assigned(self, pfn: str) -> bool: | ||
"""Checks if a sandbox exists and has been assigned.""" | ||
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.Assigned).where( | ||
sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn | ||
) | ||
result = await self.conn.execute(stmt) | ||
is_assigned = result.scalar_one() | ||
return is_assigned |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import secrets | ||
from datetime import datetime | ||
|
||
import pytest | ||
import sqlalchemy | ||
|
||
from diracx.core.models import SandboxInfo, UserInfo | ||
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB | ||
from diracx.db.sql.sandbox_metadata.schema import sb_SandBoxes | ||
|
||
|
||
@pytest.fixture | ||
async def sandbox_metadata_db(tmp_path): | ||
sandbox_metadata_db = SandboxMetadataDB("sqlite+aiosqlite:///:memory:") | ||
async with sandbox_metadata_db.engine_context(): | ||
async with sandbox_metadata_db.engine.begin() as conn: | ||
await conn.run_sync(sandbox_metadata_db.metadata.create_all) | ||
yield sandbox_metadata_db | ||
|
||
|
||
def test_get_pfn(sandbox_metadata_db: SandboxMetadataDB): | ||
user_info = UserInfo( | ||
sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo" | ||
) | ||
sandbox_info = SandboxInfo( | ||
checksum="checksum", | ||
checksum_algorithm="sha256", | ||
format="tar.bz2", | ||
size=100, | ||
) | ||
pfn = sandbox_metadata_db.get_pfn("bucket1", user_info, sandbox_info) | ||
assert pfn == "/S3/bucket1/vo/group1/user1/sha256:checksum.tar.bz2" | ||
|
||
|
||
async def test_insert_sandbox(sandbox_metadata_db: SandboxMetadataDB): | ||
user_info = UserInfo( | ||
sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo" | ||
) | ||
pfn1 = secrets.token_hex() | ||
|
||
# Make sure the sandbox doesn't already exist | ||
db_contents = await _dump_db(sandbox_metadata_db) | ||
assert pfn1 not in db_contents | ||
async with sandbox_metadata_db: | ||
with pytest.raises(sqlalchemy.exc.NoResultFound): | ||
await sandbox_metadata_db.sandbox_is_assigned(pfn1) | ||
|
||
# Insert the sandbox | ||
async with sandbox_metadata_db: | ||
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100) | ||
db_contents = await _dump_db(sandbox_metadata_db) | ||
owner_id1, last_access_time1 = db_contents[pfn1] | ||
|
||
# Inserting again should update the last access time | ||
await asyncio.sleep(1) # The timestamp only has second precision | ||
async with sandbox_metadata_db: | ||
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100) | ||
db_contents = await _dump_db(sandbox_metadata_db) | ||
owner_id2, last_access_time2 = db_contents[pfn1] | ||
assert owner_id1 == owner_id2 | ||
assert last_access_time2 > last_access_time1 | ||
|
||
# The sandbox still hasn't been assigned | ||
async with sandbox_metadata_db: | ||
assert not await sandbox_metadata_db.sandbox_is_assigned(pfn1) | ||
|
||
# Inserting again should update the last access time | ||
await asyncio.sleep(1) # The timestamp only has second precision | ||
last_access_time3 = (await _dump_db(sandbox_metadata_db))[pfn1][1] | ||
assert last_access_time2 == last_access_time3 | ||
async with sandbox_metadata_db: | ||
await sandbox_metadata_db.update_sandbox_last_access_time(pfn1) | ||
last_access_time4 = (await _dump_db(sandbox_metadata_db))[pfn1][1] | ||
assert last_access_time2 < last_access_time4 | ||
|
||
|
||
async def _dump_db( | ||
sandbox_metadata_db: SandboxMetadataDB, | ||
) -> dict[str, tuple[int, datetime]]: | ||
"""Dump the contents of the sandbox metadata database | ||
Returns a dict[pfn: str, (owner_id: int, last_access_time: datetime)] | ||
""" | ||
async with sandbox_metadata_db: | ||
stmt = sqlalchemy.select( | ||
sb_SandBoxes.SEPFN, sb_SandBoxes.OwnerId, sb_SandBoxes.LastAccessTime | ||
) | ||
res = await sandbox_metadata_db.conn.execute(stmt) | ||
return {row.SEPFN: (row.OwnerId, row.LastAccessTime) for row in res} |