From 8b929e4a8626776d2437e54e474376005257a4c2 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 20:31:50 +0000 Subject: [PATCH 1/9] Bootstrap file upload server --- docker-compose.yml | 4 ++++ server/requirements.txt | 4 +++- server/src/main.py | 25 +++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 2c10c89..5a981b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,11 @@ services: target: server ports: + # cvmfs file server - "8080:80" + # application server + - "8081:81" + # cvmfs gateway - "4929:4929" volumes: diff --git a/server/requirements.txt b/server/requirements.txt index 12b1f68..afc0a41 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -1,2 +1,4 @@ watcloud-utils @ git+https://github.com/WATonomous/watcloud-utils.git@c8ce1006716e65971f750560f90f442721b3777d -python-slugify>=8.0.4,<9 \ No newline at end of file +python-slugify>=8.0.4,<9 +python-multipart>=0.0.12,<1 +uvicorn>=0.31.1,<1 \ No newline at end of file diff --git a/server/src/main.py b/server/src/main.py index 305e8c0..7e8c242 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -3,11 +3,17 @@ import string import subprocess import sys +import time from pathlib import Path +import uvicorn +from fastapi import UploadFile from slugify import slugify +from watcloud_utils.fastapi import WATcloudFastAPI +from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app +set_up_logging() @app.command() def init_cvmfs_repo(repo_name: str): @@ -70,6 +76,25 @@ def start_server(): while True: pass +fastapi_app = WATcloudFastAPI(logger=logger) + + +@fastapi_app.post("/upload") +async def upload_file(file: UploadFile): + # Time the file upload + start_time = time.perf_counter() + await file.read() + end_time = time.perf_counter() + + logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}) took {end_time - start_time:.2f}s") + + return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": end_time - start_time} + + +@app.command() +def start_server(port: int = 81): + uvicorn.run(fastapi_app, host="0.0.0.0", port=port) + if __name__ == "__main__": app() From 4b8ead2ce05a88f346c3c17731a8c721907fafd6 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 21:22:12 +0000 Subject: [PATCH 2/9] Polish upload --- server/src/main.py | 60 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/server/src/main.py b/server/src/main.py index 7e8c242..0ee9b0b 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -7,11 +7,12 @@ from pathlib import Path import uvicorn -from fastapi import UploadFile +from fastapi import UploadFile, HTTPException from slugify import slugify from watcloud_utils.fastapi import WATcloudFastAPI from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app +from threading import Lock set_up_logging() @@ -77,18 +78,51 @@ def start_server(): pass fastapi_app = WATcloudFastAPI(logger=logger) - - -@fastapi_app.post("/upload") -async def upload_file(file: UploadFile): - # Time the file upload - start_time = time.perf_counter() - await file.read() - end_time = time.perf_counter() - - logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}) took {end_time - start_time:.2f}s") - - return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": end_time - start_time} +transaction_lock = Lock() + +@fastapi_app.post("/upload/{repo_name}") +async def upload_file(repo_name: str, file: UploadFile, overwrite: bool = False): + logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})") + + # check if repo exists + if not Path(f"/cvmfs/{repo_name}").exists(): + raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist") + + file_path = Path(f"/cvmfs/{repo_name}/{file.filename}") + if not overwrite and file_path.exists(): + raise HTTPException(status_code=409, detail=f"File {file.filename} already exists") + + with transaction_lock: + # start transaction + subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) + + try: + # Remove existing file + if file_path.exists(): + file_path.unlink() + + # Upload file + with file_path.open("wb") as f: + upload_start = time.perf_counter() + f.write(await file.read()) + upload_end = time.perf_counter() + + logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}). Took {upload_end - upload_start:.2f}s") + except Exception as e: + logger.error(f"Failed to upload file: {file.filename} (content_type: {file.content_type})") + logger.exception(e) + # abort transaction + subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True) + raise HTTPException(status_code=500, detail="Failed to upload file") + + # publish transaction + publish_start = time.perf_counter() + subprocess.run(["cvmfs_server", "publish", repo_name], check=True) + publish_end = time.perf_counter() + + logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s") + + return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start} @app.command() From 6b81940db484b19a195c7ba24d0d2b5f64eeca9c Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 22:24:36 +0000 Subject: [PATCH 3/9] Implement download and list --- server/src/main.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/server/src/main.py b/server/src/main.py index 0ee9b0b..4571cc6 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -5,14 +5,15 @@ import sys import time from pathlib import Path +from threading import Lock import uvicorn -from fastapi import UploadFile, HTTPException +from fastapi import HTTPException, UploadFile +from fastapi.responses import FileResponse from slugify import slugify from watcloud_utils.fastapi import WATcloudFastAPI from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app -from threading import Lock set_up_logging() @@ -81,7 +82,7 @@ def start_server(): transaction_lock = Lock() @fastapi_app.post("/upload/{repo_name}") -async def upload_file(repo_name: str, file: UploadFile, overwrite: bool = False): +async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})") # check if repo exists @@ -124,6 +125,25 @@ async def upload_file(repo_name: str, file: UploadFile, overwrite: bool = False) return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start} +@fastapi_app.get("/download/{repo_name}/{file_name}") +async def download(repo_name: str, file_name: str): + logger.info(f"Downloading file: {file_name} from repo: {repo_name}") + + file_path = Path(f"/cvmfs/{repo_name}/{file_name}") + if not file_path.exists(): + raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}") + + return FileResponse(file_path) + +@fastapi_app.get("/list/{repo_name}") +async def list_files(repo_name: str): + logger.info(f"Listing files in repo: {repo_name}") + + repo_path = Path(f"/cvmfs/{repo_name}") + if not repo_path.exists(): + raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist") + + return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]} @app.command() def start_server(port: int = 81): From 5330f8d2168f03ed6b13c4d928f0f6a97a57e7e1 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 22:29:07 +0000 Subject: [PATCH 4/9] Enable file deletion --- server/src/main.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/server/src/main.py b/server/src/main.py index 4571cc6..2c142c4 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -81,7 +81,7 @@ def start_server(): fastapi_app = WATcloudFastAPI(logger=logger) transaction_lock = Lock() -@fastapi_app.post("/upload/{repo_name}") +@fastapi_app.post("/repos/{repo_name}/upload") async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})") @@ -125,7 +125,7 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start} -@fastapi_app.get("/download/{repo_name}/{file_name}") +@fastapi_app.get("/repos/{repo_name}/{file_name}") async def download(repo_name: str, file_name: str): logger.info(f"Downloading file: {file_name} from repo: {repo_name}") @@ -135,7 +135,7 @@ async def download(repo_name: str, file_name: str): return FileResponse(file_path) -@fastapi_app.get("/list/{repo_name}") +@fastapi_app.get("/repos/{repo_name}") async def list_files(repo_name: str): logger.info(f"Listing files in repo: {repo_name}") @@ -145,6 +145,34 @@ async def list_files(repo_name: str): return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]} +@fastapi_app.delete("/repos/{repo_name}/{file_name}") +async def delete_file(repo_name: str, file_name: str): + logger.info(f"Deleting file: {file_name} from repo: {repo_name}") + + file_path = Path(f"/cvmfs/{repo_name}/{file_name}") + if not file_path.exists(): + raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}") + + with transaction_lock: + # start transaction + subprocess.run(["cvmfs_server", "transaction", repo_name], check=True) + + try: + # Remove file + file_path.unlink() + logger.info(f"Deleted file: {file_name} from repo: {repo_name}") + except Exception as e: + logger.error(f"Failed to delete file: {file_name} from repo: {repo_name}") + logger.exception(e) + # abort transaction + subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True) + raise HTTPException(status_code=500, detail="Failed to delete file") + + # publish transaction + subprocess.run(["cvmfs_server", "publish", repo_name], check=True) + + return {"filename": file_name} + @app.command() def start_server(port: int = 81): uvicorn.run(fastapi_app, host="0.0.0.0", port=port) From 09ab8d8f3e8a113b1cafe576216ed663015ac80d Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 23:13:34 +0000 Subject: [PATCH 5/9] Enable support for volatile repos and garbage collection --- README.md | 4 +++- server/src/main.py | 48 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c714078..3f197fa 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,9 @@ A CVMFS stratum 0 server meant fo storing ephemeral data. The main features are: These features make it suitable for storing short-lived artifacts in CI/CD pipelines. Coming soon: -- [ ] File upload API (we may be able to simply use the [publisher](https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#publisher-configuration). It has nice features like being able to handle concurrent transactions.) +- [x] File upload API (we may be able to simply use the [publisher](https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#publisher-configuration). It has nice features like being able to handle concurrent transactions.) + - The publisher appears to be bottlenecked at 20MiB/s when running the server in Kubernetes, and around 80MiB/s when running in Docker. `iperf` gives much higher bandwidth (between nodes and between the Kubernetes container and nodes), so it's likely not a network bottleneck. + - When using the custom FastAPI upload server, speeds reach over 400MiB/s easily. We'll adopt this approach. - [ ] Garbage collection - [ ] Better documentation - [ ] Automatic [whitelist re-signing](https://cvmfs.readthedocs.io/en/stable/apx-security.html#signature-details) diff --git a/server/src/main.py b/server/src/main.py index 2c142c4..ff8199c 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -14,11 +14,26 @@ from watcloud_utils.fastapi import WATcloudFastAPI from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app +from typing_extensions import Annotated +import typer + set_up_logging() @app.command() -def init_cvmfs_repo(repo_name: str): +def init_cvmfs_repo( + repo_name: Annotated[str, typer.Argument(help="Name of the CVMFS repo. CVMFS requires this to be an FQDN.")], + volatile: Annotated[bool, typer.Option(help="Whether the repo is volatile or not. If True, the repo will be created (cvmfs_server mkfs) with the -v flag.")] = True, + enable_garbage_collection: Annotated[bool, typer.Option(help="Whether to enable garbage collection for the repo.")] = True, + disable_auto_tag: Annotated[bool, typer.Option(help="Whether to disable auto-tagging for the repo.")] = True, + compression_algorithm: Annotated[str, typer.Option(help="Compression algorithm to use for the repo.")] = "none", + file_mbyte_limit: Annotated[int, typer.Option(help="Maximum file size in MiB that can be uploaded to the repo.")] = 4096, +): + """ + Initialize a CVMFS repo. + + Docs: https://cvmfs.readthedocs.io/en/stable/cpt-repo.html + """ print(f"Initializing CVMFS repo: {repo_name}") # Make apache2 serve cvmfs repos @@ -37,10 +52,23 @@ def init_cvmfs_repo(repo_name: str): sys.exit(f"Failed to start apache2 service (exit code: {res.returncode})") # Run cvmfs_server mkfs - res = subprocess.run(["cvmfs_server", "mkfs", "-o", "root", "-Z", "none", repo_name], check=True) + res = subprocess.run( + ["cvmfs_server", "mkfs", "-o", "root", "-Z", compression_algorithm] + + (["-v"] if volatile else []) + + (["-z"] if enable_garbage_collection else []) + + (["-g"] if disable_auto_tag else []) + + [repo_name], + check=True + ) if res.returncode != 0: sys.exit(f"Failed to run cvmfs_server mkfs (exit code: {res.returncode})") + # Populate repo configuration + repo_config_path = Path(f"/etc/cvmfs/repositories.d/{repo_name}/server.conf") + with open(repo_config_path, "a") as f: + f.write("\n") + f.write(f"CVMFS_FILE_MBYTE_LIMIT={file_mbyte_limit}\n") + # Make the public key and certificate available via HTTP # Useful for clients and publishers: # https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#example-procedure @@ -81,7 +109,7 @@ def start_server(): fastapi_app = WATcloudFastAPI(logger=logger) transaction_lock = Lock() -@fastapi_app.post("/repos/{repo_name}/upload") +@fastapi_app.post("/repos/{repo_name}") async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})") @@ -173,6 +201,20 @@ async def delete_file(repo_name: str, file_name: str): return {"filename": file_name} +@app.command() +@fastapi_app.post("/gc") +def gc(): + """ + Perform garbage collection on all repos. + """ + with transaction_lock: + logger.info("Running garbage collection") + gc_start = time.perf_counter() + subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True) + gc_end = time.perf_counter() + logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s") + return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start} + @app.command() def start_server(port: int = 81): uvicorn.run(fastapi_app, host="0.0.0.0", port=port) From bda968f9fdb2c15483dab047c023d674c7d38b29 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Sun, 13 Oct 2024 23:13:45 +0000 Subject: [PATCH 6/9] Organize imports --- server/src/main.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main.py b/server/src/main.py index ff8199c..9d24f65 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -7,16 +7,15 @@ from pathlib import Path from threading import Lock +import typer import uvicorn from fastapi import HTTPException, UploadFile from fastapi.responses import FileResponse from slugify import slugify +from typing_extensions import Annotated from watcloud_utils.fastapi import WATcloudFastAPI from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app -from typing_extensions import Annotated -import typer - set_up_logging() From 0169578acae5ba4fffbb8474dfc57a8acde8f7c6 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Mon, 14 Oct 2024 01:59:36 +0000 Subject: [PATCH 7/9] Add notify feature --- server/src/main.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/server/src/main.py b/server/src/main.py index 9d24f65..8491538 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -150,6 +150,8 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False): logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s") + notify(repo_name) + return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start} @fastapi_app.get("/repos/{repo_name}/{file_name}") @@ -197,6 +199,7 @@ async def delete_file(repo_name: str, file_name: str): # publish transaction subprocess.run(["cvmfs_server", "publish", repo_name], check=True) + notify(repo_name) return {"filename": file_name} @@ -214,6 +217,33 @@ def gc(): logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s") return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start} + +@app.command() +@fastapi_app.post("/repos/{repo_name}/notify") +def notify(repo_name: str): + """ + Use cvmfs-gateway to notify clients about changes in the repo. + """ + logger.info(f"Notifying clients about changes in repo: {repo_name}") + subprocess.run( + [ + "cvmfs_swissknife", + "notify", + # publish + "-p", + # notification server URL + "-u", + "http://localhost:4929/api/v1", + # URL of the repository + "-r", + f"http://localhost/cvmfs/{repo_name}", + ], + check=True, + ) + + return {"message": f"Notified clients about changes in repo {repo_name}"} + + @app.command() def start_server(port: int = 81): uvicorn.run(fastapi_app, host="0.0.0.0", port=port) From 64785e88f39a7e3691186be568faf3d86d41b222 Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Mon, 14 Oct 2024 02:01:30 +0000 Subject: [PATCH 8/9] Add repo existence check to notify --- server/src/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main.py b/server/src/main.py index 8491538..fe06b01 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -225,6 +225,10 @@ def notify(repo_name: str): Use cvmfs-gateway to notify clients about changes in the repo. """ logger.info(f"Notifying clients about changes in repo: {repo_name}") + + if not Path(f"/cvmfs/{repo_name}").exists(): + raise HTTPException(status_code=404, detail=f"Repo {repo_name} does not exist") + subprocess.run( [ "cvmfs_swissknife", From 01ef8d6b38b62d25c0940020c345b542f65ee81c Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Mon, 14 Oct 2024 03:15:53 +0000 Subject: [PATCH 9/9] Implement periodic GC --- README.md | 2 +- server/requirements.txt | 3 ++- server/src/main.py | 28 ++++++++++++++++++++-------- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3f197fa..39e2462 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Coming soon: - [x] File upload API (we may be able to simply use the [publisher](https://cvmfs.readthedocs.io/en/stable/cpt-repository-gateway.html#publisher-configuration). It has nice features like being able to handle concurrent transactions.) - The publisher appears to be bottlenecked at 20MiB/s when running the server in Kubernetes, and around 80MiB/s when running in Docker. `iperf` gives much higher bandwidth (between nodes and between the Kubernetes container and nodes), so it's likely not a network bottleneck. - When using the custom FastAPI upload server, speeds reach over 400MiB/s easily. We'll adopt this approach. -- [ ] Garbage collection +- [x] Garbage collection - [ ] Better documentation - [ ] Automatic [whitelist re-signing](https://cvmfs.readthedocs.io/en/stable/apx-security.html#signature-details) diff --git a/server/requirements.txt b/server/requirements.txt index afc0a41..8b03d3a 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -1,4 +1,5 @@ watcloud-utils @ git+https://github.com/WATonomous/watcloud-utils.git@c8ce1006716e65971f750560f90f442721b3777d python-slugify>=8.0.4,<9 python-multipart>=0.0.12,<1 -uvicorn>=0.31.1,<1 \ No newline at end of file +uvicorn>=0.31.1,<1 +apscheduler>=3.10.4,<4 \ No newline at end of file diff --git a/server/src/main.py b/server/src/main.py index fe06b01..0fe3a23 100644 --- a/server/src/main.py +++ b/server/src/main.py @@ -4,16 +4,19 @@ import subprocess import sys import time +from contextlib import asynccontextmanager from pathlib import Path from threading import Lock import typer import uvicorn +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger from fastapi import HTTPException, UploadFile from fastapi.responses import FileResponse from slugify import slugify from typing_extensions import Annotated -from watcloud_utils.fastapi import WATcloudFastAPI +from watcloud_utils.fastapi import WATcloudFastAPI, FastAPI from watcloud_utils.logging import logger, set_up_logging from watcloud_utils.typer import app @@ -99,13 +102,22 @@ def init_cvmfs_repo( print(f"Successfully initialized CVMFS repo: {repo_name}") print(f"The public key is available via HTTP at GET /cvmfs-meta/{repo_name}.pub") -@app.command() -def start_server(): - print("Starting server") - while True: - pass - -fastapi_app = WATcloudFastAPI(logger=logger) +@asynccontextmanager +async def fastapi_lifespan(app: FastAPI): + """ + This function wraps the FastAPI app in a lifespan context manager. + i.e. it allows us to run code when the app starts and stops. + """ + try: + scheduler.start() + # Run garbage collection every minute + scheduler.add_job(gc, CronTrigger.from_crontab("* * * * *")) + yield + finally: + scheduler.shutdown() + +scheduler = BackgroundScheduler() +fastapi_app = WATcloudFastAPI(logger=logger, lifespan=fastapi_lifespan) transaction_lock = Lock() @fastapi_app.post("/repos/{repo_name}")