Skip to content

Commit

Permalink
Implement file upload server (#1)
Browse files Browse the repository at this point in the history
This PR implements a file upload server that can be used as an file
manager for the CVMFS.

- [x] Implement notification on file change
- [x] Automate garbage collection
  • Loading branch information
ben-z authored Oct 14, 2024
1 parent a4bd2f2 commit 98f236a
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 9 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ A CVMFS stratum 0 server meant fo storing ephemeral data. The main features are:
These features make it suitable for storing short-lived artifacts in CI/CD pipelines.

Coming soon:
- [ ] File upload API (we may be able to simply use the [publisher](https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#publisher-configuration). It has nice features like being able to handle concurrent transactions.)
- [ ] Garbage collection
- [x] File upload API (we may be able to simply use the [publisher](https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#publisher-configuration). It has nice features like being able to handle concurrent transactions.)
- The publisher appears to be bottlenecked at 20MiB/s when running the server in Kubernetes, and around 80MiB/s when running in Docker. `iperf` gives much higher bandwidth (between nodes and between the Kubernetes container and nodes), so it's likely not a network bottleneck.
- When using the custom FastAPI upload server, speeds reach over 400MiB/s easily. We'll adopt this approach.
- [x] Garbage collection
- [ ] Better documentation
- [ ] Automatic [whitelist re-signing](https://cvmfs.readthedocs.io/en/stable/apx-security.html#signature-details)

Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ services:
target: server

ports:
# cvmfs file server
- "8080:80"
# application server
- "8081:81"
# cvmfs gateway
- "4929:4929"

volumes:
Expand Down
5 changes: 4 additions & 1 deletion server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
watcloud-utils @ git+https://github.com/WATonomous/watcloud-utils.git@c8ce1006716e65971f750560f90f442721b3777d
python-slugify>=8.0.4,<9
python-slugify>=8.0.4,<9
python-multipart>=0.0.12,<1
uvicorn>=0.31.1,<1
apscheduler>=3.10.4,<4
206 changes: 200 additions & 6 deletions server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,39 @@
import string
import subprocess
import sys
import time
from contextlib import asynccontextmanager
from pathlib import Path
from threading import Lock

import typer
import uvicorn
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import HTTPException, UploadFile
from fastapi.responses import FileResponse
from slugify import slugify
from typing_extensions import Annotated
from watcloud_utils.fastapi import WATcloudFastAPI, FastAPI
from watcloud_utils.logging import logger, set_up_logging
from watcloud_utils.typer import app

set_up_logging()

@app.command()
def init_cvmfs_repo(repo_name: str):
def init_cvmfs_repo(
repo_name: Annotated[str, typer.Argument(help="Name of the CVMFS repo. CVMFS requires this to be an FQDN.")],
volatile: Annotated[bool, typer.Option(help="Whether the repo is volatile or not. If True, the repo will be created (cvmfs_server mkfs) with the -v flag.")] = True,
enable_garbage_collection: Annotated[bool, typer.Option(help="Whether to enable garbage collection for the repo.")] = True,
disable_auto_tag: Annotated[bool, typer.Option(help="Whether to disable auto-tagging for the repo.")] = True,
compression_algorithm: Annotated[str, typer.Option(help="Compression algorithm to use for the repo.")] = "none",
file_mbyte_limit: Annotated[int, typer.Option(help="Maximum file size in MiB that can be uploaded to the repo.")] = 4096,
):
"""
Initialize a CVMFS repo.
Docs: https://cvmfs.readthedocs.io/en/stable/cpt-repo.html
"""
print(f"Initializing CVMFS repo: {repo_name}")

# Make apache2 serve cvmfs repos
Expand All @@ -29,10 +54,23 @@ def init_cvmfs_repo(repo_name: str):
sys.exit(f"Failed to start apache2 service (exit code: {res.returncode})")

# Run cvmfs_server mkfs
res = subprocess.run(["cvmfs_server", "mkfs", "-o", "root", "-Z", "none", repo_name], check=True)
res = subprocess.run(
["cvmfs_server", "mkfs", "-o", "root", "-Z", compression_algorithm]
+ (["-v"] if volatile else [])
+ (["-z"] if enable_garbage_collection else [])
+ (["-g"] if disable_auto_tag else [])
+ [repo_name],
check=True
)
if res.returncode != 0:
sys.exit(f"Failed to run cvmfs_server mkfs (exit code: {res.returncode})")

# Populate repo configuration
repo_config_path = Path(f"/etc/cvmfs/repositories.d/{repo_name}/server.conf")
with open(repo_config_path, "a") as f:
f.write("\n")
f.write(f"CVMFS_FILE_MBYTE_LIMIT={file_mbyte_limit}\n")

# Make the public key and certificate available via HTTP
# Useful for clients and publishers:
# https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#example-procedure
Expand Down Expand Up @@ -64,11 +102,167 @@ def init_cvmfs_repo(repo_name: str):
print(f"Successfully initialized CVMFS repo: {repo_name}")
print(f"The public key is available via HTTP at GET /cvmfs-meta/{repo_name}.pub")

@asynccontextmanager
async def fastapi_lifespan(app: FastAPI):
"""
This function wraps the FastAPI app in a lifespan context manager.
i.e. it allows us to run code when the app starts and stops.
"""
try:
scheduler.start()
# Run garbage collection every minute
scheduler.add_job(gc, CronTrigger.from_crontab("* * * * *"))
yield
finally:
scheduler.shutdown()

scheduler = BackgroundScheduler()
fastapi_app = WATcloudFastAPI(logger=logger, lifespan=fastapi_lifespan)
transaction_lock = Lock()

@fastapi_app.post("/repos/{repo_name}")
async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})")

# check if repo exists
if not Path(f"/cvmfs/{repo_name}").exists():
raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist")

file_path = Path(f"/cvmfs/{repo_name}/{file.filename}")
if not overwrite and file_path.exists():
raise HTTPException(status_code=409, detail=f"File {file.filename} already exists")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Remove existing file
if file_path.exists():
file_path.unlink()

# Upload file
with file_path.open("wb") as f:
upload_start = time.perf_counter()
f.write(await file.read())
upload_end = time.perf_counter()

logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}). Took {upload_end - upload_start:.2f}s")
except Exception as e:
logger.error(f"Failed to upload file: {file.filename} (content_type: {file.content_type})")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail="Failed to upload file")

# publish transaction
publish_start = time.perf_counter()
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
publish_end = time.perf_counter()

logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s")

notify(repo_name)

return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start}

@fastapi_app.get("/repos/{repo_name}/{file_name}")
async def download(repo_name: str, file_name: str):
logger.info(f"Downloading file: {file_name} from repo: {repo_name}")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

return FileResponse(file_path)

@fastapi_app.get("/repos/{repo_name}")
async def list_files(repo_name: str):
logger.info(f"Listing files in repo: {repo_name}")

repo_path = Path(f"/cvmfs/{repo_name}")
if not repo_path.exists():
raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist")

return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]}

@fastapi_app.delete("/repos/{repo_name}/{file_name}")
async def delete_file(repo_name: str, file_name: str):
logger.info(f"Deleting file: {file_name} from repo: {repo_name}")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Remove file
file_path.unlink()
logger.info(f"Deleted file: {file_name} from repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to delete file: {file_name} from repo: {repo_name}")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail="Failed to delete file")

# publish transaction
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
notify(repo_name)

return {"filename": file_name}

@app.command()
@fastapi_app.post("/gc")
def gc():
"""
Perform garbage collection on all repos.
"""
with transaction_lock:
logger.info("Running garbage collection")
gc_start = time.perf_counter()
subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True)
gc_end = time.perf_counter()
logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s")
return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start}


@app.command()
@fastapi_app.post("/repos/{repo_name}/notify")
def notify(repo_name: str):
"""
Use cvmfs-gateway to notify clients about changes in the repo.
"""
logger.info(f"Notifying clients about changes in repo: {repo_name}")

if not Path(f"/cvmfs/{repo_name}").exists():
raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist")

subprocess.run(
[
"cvmfs_swissknife",
"notify",
# publish
"-p",
# notification server URL
"-u",
"http://localhost:4929/api/v1",
# URL of the repository
"-r",
f"http://localhost/cvmfs/{repo_name}",
],
check=True,
)

return {"message": f"Notified clients about changes in repo {repo_name}"}


@app.command()
def start_server():
print("Starting server")
while True:
pass
def start_server(port: int = 81):
uvicorn.run(fastapi_app, host="0.0.0.0", port=port)


if __name__ == "__main__":
Expand Down

0 comments on commit 98f236a

Please sign in to comment.