Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for getting DiracX installed on lbcertifidirac70 #126

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/diracx/db/sql/jobs/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class Jobs(JobDBBase):

JobID = Column("JobID", Integer, primary_key=True, default=0)
JobType = Column("JobType", String(32), default="user")
DIRACSetup = Column("DIRACSetup", String(32), default="test")
JobGroup = Column("JobGroup", String(32), default="00000000")
JobSplitType = Column(
"JobSplitType", Enum("Single", "Master", "Subjob", "DAGNode"), default="Single"
Expand All @@ -68,7 +67,6 @@ class Jobs(JobDBBase):
Site = Column("Site", String(100), default="ANY")
JobName = Column("JobName", String(128), default="Unknown")
Owner = Column("Owner", String(64), default="Unknown")
OwnerDN = Column("OwnerDN", String(255), default="Unknown")
OwnerGroup = Column("OwnerGroup", String(128), default="Unknown")
SubmissionTime = NullColumn("SubmissionTime", DateTime)
RescheduleTime = NullColumn("RescheduleTime", DateTime)
Expand Down
22 changes: 9 additions & 13 deletions src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@
from .schema import Base as SandboxMetadataDBBase
from .schema import sb_Owners, sb_SandBoxes

# In legacy DIRAC the SEName column was used to support multiple different
# storage backends. This is no longer the case, so we hardcode the value to
# S3 to represent the new DiracX system.
SE_NAME = "ProductionSandboxSE"
PFN_PREFIX = "/S3/"


class SandboxMetadataDB(BaseSQLDB):
metadata = SandboxMetadataDBBase.metadata
Expand Down Expand Up @@ -50,13 +44,15 @@ def get_pfn(bucket_name: str, user: UserInfo, sandbox_info: SandboxInfo) -> str:
]
return "/" + "/".join(parts)

async def insert_sandbox(self, user: UserInfo, pfn: str, size: int) -> None:
async def insert_sandbox(
self, se_name: str, user: UserInfo, pfn: str, size: int
) -> None:
"""Add a new sandbox in SandboxMetadataDB"""
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
owner_id = await self.upsert_owner(user)
stmt = sqlalchemy.insert(sb_SandBoxes).values(
OwnerId=owner_id,
SEName=SE_NAME,
SEName=se_name,
SEPFN=pfn,
Bytes=size,
RegistrationTime=utcnow(),
Expand All @@ -65,23 +61,23 @@ async def insert_sandbox(self, user: UserInfo, pfn: str, size: int) -> None:
try:
result = await self.conn.execute(stmt)
except sqlalchemy.exc.IntegrityError:
await self.update_sandbox_last_access_time(pfn)
await self.update_sandbox_last_access_time(se_name, pfn)
else:
assert result.rowcount == 1

async def update_sandbox_last_access_time(self, pfn: str) -> None:
async def update_sandbox_last_access_time(self, se_name: str, pfn: str) -> None:
stmt = (
sqlalchemy.update(sb_SandBoxes)
.where(sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn)
.where(sb_SandBoxes.SEName == se_name, sb_SandBoxes.SEPFN == pfn)
.values(LastAccessTime=utcnow())
)
result = await self.conn.execute(stmt)
assert result.rowcount == 1

async def sandbox_is_assigned(self, pfn: str) -> bool:
async def sandbox_is_assigned(self, se_name: str, pfn: str) -> bool:
"""Checks if a sandbox exists and has been assigned."""
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.Assigned).where(
sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn
sb_SandBoxes.SEName == se_name, sb_SandBoxes.SEPFN == pfn
)
result = await self.conn.execute(stmt)
is_assigned = result.scalar_one()
Expand Down
5 changes: 1 addition & 4 deletions src/diracx/db/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ async def engine_context(self) -> AsyncIterator[None]:
"""
assert self._engine is None, "engine_context cannot be nested"

engine = create_async_engine(
self._db_url,
echo=True,
)
engine = create_async_engine(self._db_url)
self._engine = engine

yield
Expand Down
31 changes: 28 additions & 3 deletions src/diracx/routers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,13 @@

# Extract attributes from the OIDC token details
sub = oidc_token_info["sub"]
preferred_username = oidc_token_info.get("preferred_username", sub)
if user_info := config.Registry[vo].Users.get(sub):
preferred_username = user_info.PreferedUsername
else:
preferred_username = oidc_token_info.get("preferred_username", sub)

Check warning on line 269 in src/diracx/routers/auth.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/auth.py#L269

Added line #L269 was not covered by tests
raise NotImplementedError(
"Dynamic registration of users is not yet implemented"
)

# Extract attributes from the settings and configuration
issuer = settings.token_issuer
Expand Down Expand Up @@ -475,7 +481,7 @@
(note: it can't be put as parameter or in the URL)
"""

# Here we make sure the user_code actualy exists
# Here we make sure the user_code actually exists
scope = await auth_db.device_flow_validate_user_code(
user_code, settings.device_flow_expiration_seconds
)
Expand Down Expand Up @@ -1032,7 +1038,26 @@
settings: AuthSettings,
config: Config,
):
"""Endpoint used by legacy DIRAC to mint tokens for proxy -> token exchange."""
"""Endpoint used by legacy DIRAC to mint tokens for proxy -> token exchange.

This route is disabled if DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY is not set
in the environment.

If legacy token exchange is required, an API key must be included in the
request. This can be generated with the following python code::

import secrets
import base64
import hashlib
token = secrets.token_bytes()

# This is the secret to include in the request
print(f"API key is diracx:legacy:{base64.urlsafe_b64encode(token).decode()}")

# This is the environment variable to set on the DiracX server
print(f"DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY={hashlib.sha256(token).hexdigest()}")

"""
if not (
expected_api_key := os.environ.get("DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY")
):
Expand Down
23 changes: 16 additions & 7 deletions src/diracx/routers/job_manager/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
s3_client_kwargs: dict[str, str]
auto_create_bucket: bool = False
url_validity_seconds: int = 5 * 60
se_name: str = "SandboxSE"
_client: S3Client = PrivateAttr(None)

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -97,9 +98,12 @@
)

pfn = sandbox_metadata_db.get_pfn(settings.bucket_name, user_info, sandbox_info)
full_pfn = f"SB:{settings.se_name}|{pfn}"

try:
exists_and_assigned = await sandbox_metadata_db.sandbox_is_assigned(pfn)
exists_and_assigned = await sandbox_metadata_db.sandbox_is_assigned(
settings.se_name, pfn
)
except NoResultFound:
# The sandbox doesn't exist in the database
pass
Expand All @@ -112,8 +116,10 @@
if exists_and_assigned or s3_object_exists(
settings.s3_client, settings.bucket_name, pfn_to_key(pfn)
):
await sandbox_metadata_db.update_sandbox_last_access_time(pfn)
return SandboxUploadResponse(pfn=pfn)
await sandbox_metadata_db.update_sandbox_last_access_time(

Check warning on line 119 in src/diracx/routers/job_manager/sandboxes.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/job_manager/sandboxes.py#L119

Added line #L119 was not covered by tests
settings.se_name, pfn
)
return SandboxUploadResponse(pfn=full_pfn)

Check warning on line 122 in src/diracx/routers/job_manager/sandboxes.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/job_manager/sandboxes.py#L122

Added line #L122 was not covered by tests

upload_info = await generate_presigned_upload(
settings.s3_client,
Expand All @@ -124,9 +130,11 @@
sandbox_info.size,
settings.url_validity_seconds,
)
await sandbox_metadata_db.insert_sandbox(user_info, pfn, sandbox_info.size)
await sandbox_metadata_db.insert_sandbox(

Check warning on line 133 in src/diracx/routers/job_manager/sandboxes.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/job_manager/sandboxes.py#L133

Added line #L133 was not covered by tests
settings.se_name, user_info, pfn, sandbox_info.size
)

return SandboxUploadResponse(**upload_info, pfn=pfn)
return SandboxUploadResponse(**upload_info, pfn=full_pfn)

Check warning on line 137 in src/diracx/routers/job_manager/sandboxes.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/job_manager/sandboxes.py#L137

Added line #L137 was not covered by tests


class SandboxDownloadResponse(BaseModel):
Expand All @@ -143,8 +151,8 @@


SANDBOX_PFN_REGEX = (
# Starts with /S3/<bucket_name>
r"^/S3/[a-z0-9\.\-]{3,63}"
# Starts with /S3/<bucket_name> or /SB:<se_name>|/S3/<bucket_name>
r"^(:?SB:[A-Za-z]+\|)?/S3/[a-z0-9\.\-]{3,63}"
# Followed /<vo>/<group>/<username>/<checksum_algorithm>:<checksum>.<format>
r"(?:/[^/]+){3}/[a-z0-9]{3,10}:[0-9a-f]{64}\.[a-z0-9\.]+$"
)
Expand All @@ -164,6 +172,7 @@
most storage backends return an error when they receive an authorization
header for a presigned URL.
"""
pfn = pfn.split("|", 1)[-1]
required_prefix = (
"/"
+ "/".join(
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def pytest_collection_modifyitems(config, items):
# --regenerate-client given in cli: allow client re-generation
return
skip_regen = pytest.mark.skip(reason="need --regenerate-client option to run")
found = False
for item in items:
if item.name == "test_regenerate_client":
item.add_marker(skip_regen)
Expand Down
10 changes: 5 additions & 5 deletions tests/db/test_sandbox_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,33 @@ async def test_insert_sandbox(sandbox_metadata_db: SandboxMetadataDB):
assert pfn1 not in db_contents
async with sandbox_metadata_db:
with pytest.raises(sqlalchemy.exc.NoResultFound):
await sandbox_metadata_db.sandbox_is_assigned(pfn1)
await sandbox_metadata_db.sandbox_is_assigned("SandboxSE", pfn1)

# Insert the sandbox
async with sandbox_metadata_db:
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100)
await sandbox_metadata_db.insert_sandbox("SandboxSE", user_info, pfn1, 100)
db_contents = await _dump_db(sandbox_metadata_db)
owner_id1, last_access_time1 = db_contents[pfn1]

# Inserting again should update the last access time
await asyncio.sleep(1) # The timestamp only has second precision
async with sandbox_metadata_db:
await sandbox_metadata_db.insert_sandbox(user_info, pfn1, 100)
await sandbox_metadata_db.insert_sandbox("SandboxSE", user_info, pfn1, 100)
db_contents = await _dump_db(sandbox_metadata_db)
owner_id2, last_access_time2 = db_contents[pfn1]
assert owner_id1 == owner_id2
assert last_access_time2 > last_access_time1

# The sandbox still hasn't been assigned
async with sandbox_metadata_db:
assert not await sandbox_metadata_db.sandbox_is_assigned(pfn1)
assert not await sandbox_metadata_db.sandbox_is_assigned("SandboxSE", pfn1)

# Inserting again should update the last access time
await asyncio.sleep(1) # The timestamp only has second precision
last_access_time3 = (await _dump_db(sandbox_metadata_db))[pfn1][1]
assert last_access_time2 == last_access_time3
async with sandbox_metadata_db:
await sandbox_metadata_db.update_sandbox_last_access_time(pfn1)
await sandbox_metadata_db.update_sandbox_last_access_time("SandboxSE", pfn1)
last_access_time4 = (await _dump_db(sandbox_metadata_db))[pfn1][1]
assert last_access_time2 < last_access_time4

Expand Down
2 changes: 1 addition & 1 deletion tests/routers/jobs/test_sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_upload_then_download(
upload_info = r.json()
assert upload_info["url"]
sandbox_pfn = upload_info["pfn"]
assert sandbox_pfn.startswith("/S3/")
assert sandbox_pfn.startswith("SB:SandboxSE|/S3/")

# Actually upload the file
files = {"file": ("file", BytesIO(data))}
Expand Down