Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement file TTL #4

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 161 additions & 15 deletions server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

set_up_logging()

TTL_FILENAME = "ttl.json"
DEFAULT_TTL_S = 7200

FILENAME_BLACKLIST = [TTL_FILENAME]

@app.command()
def init_cvmfs_repo(
repo_name: Annotated[str, typer.Argument(help="Name of the CVMFS repo. CVMFS requires this to be an FQDN.")],
Expand Down Expand Up @@ -110,8 +115,8 @@ async def fastapi_lifespan(app: FastAPI):
"""
try:
scheduler.start()
# Run garbage collection every minute
scheduler.add_job(gc, CronTrigger.from_crontab("* * * * *"))
# Run housekeeping every minute
scheduler.add_job(housekeeping, CronTrigger.from_crontab("* * * * *"))
yield
finally:
scheduler.shutdown()
Expand All @@ -121,8 +126,11 @@ async def fastapi_lifespan(app: FastAPI):
transaction_lock = Lock()

@fastapi_app.post("/repos/{repo_name}")
async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type})")
async def upload(repo_name: str, file: UploadFile, overwrite: bool = False, ttl_s: int = DEFAULT_TTL_S):
logger.info(f"Uploading file: {file.filename} (content_type: {file.content_type}, ttl_s: {ttl_s}) to repo: {repo_name}")

if file.filename in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file.filename} is not allowed")

# check if repo exists
if not Path(f"/cvmfs/{repo_name}").exists():
Expand All @@ -132,6 +140,9 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
if not overwrite and file_path.exists():
raise HTTPException(status_code=409, detail=f"File {file.filename} already exists")

expires_at = time.time() + ttl_s
ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)
Expand All @@ -147,6 +158,11 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
f.write(await file.read())
upload_end = time.perf_counter()

# Update TTL
ttl_obj = json.loads(ttl_path.read_text()) if ttl_path.exists() else {}
ttl_obj[file.filename] = {"expires_at": expires_at}
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Uploaded file: {file.filename} (content_type: {file.content_type}). Took {upload_end - upload_start:.2f}s")
except Exception as e:
logger.error(f"Failed to upload file: {file.filename} (content_type: {file.content_type})")
Expand All @@ -160,12 +176,59 @@ async def upload(repo_name: str, file: UploadFile, overwrite: bool = False):
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
publish_end = time.perf_counter()

logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s")
logger.info(f"Published transaction for repo: {repo_name} with file: {file.filename} (content_type: {file.content_type}). Took {publish_end - publish_start:.2f}s. Expires at: {expires_at}")

notify(repo_name)

return {"filename": file.filename, "content_type": file.content_type, "upload_time_s": upload_end - upload_start, "publish_time_s": publish_end - publish_start}
return {
"filename": file.filename,
"content_type": file.content_type,
"expires_at": expires_at,
"upload_time_s": upload_end - upload_start,
"publish_time_s": publish_end - publish_start,
}


@app.command()
@fastapi_app.post("/repos/{repo_name}/{file_name}/ttl")
async def update_ttl(repo_name: str, file_name: str, ttl_s: int):
logger.info(f"Updating TTL for file: {file_name} in repo: {repo_name}")

if file_name in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Update TTL
ttl_obj = json.loads(ttl_path.read_text())
ttl_obj[file_name] = {"expires_at": time.time() + ttl_s}
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Updated TTL for file: {file_name} in repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to update TTL for file: {file_name} in repo: {repo_name}")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail=f"Failed to update TTL for file: {file_name}: {e}")

# publish transaction
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
notify(repo_name)

return {"filename": file_name, "ttl_s": ttl_s}


@app.command()
@fastapi_app.get("/repos/{repo_name}/{file_name}")
async def download(repo_name: str, file_name: str):
logger.info(f"Downloading file: {file_name} from repo: {repo_name}")
Expand All @@ -176,6 +239,7 @@ async def download(repo_name: str, file_name: str):

return FileResponse(file_path)

@app.command()
@fastapi_app.get("/repos/{repo_name}")
async def list_files(repo_name: str):
logger.info(f"Listing files in repo: {repo_name}")
Expand All @@ -186,21 +250,33 @@ async def list_files(repo_name: str):

return {"files": [file.name for file in repo_path.iterdir() if file.is_file()]}

@app.command()
@fastapi_app.delete("/repos/{repo_name}/{file_name}")
async def delete_file(repo_name: str, file_name: str):
logger.info(f"Deleting file: {file_name} from repo: {repo_name}")

if file_name in FILENAME_BLACKLIST:
raise HTTPException(status_code=400, detail=f"Filename {file_name} is not allowed")

file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_name} does not exist in repo {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")

with transaction_lock:
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
# Remove file
file_path.unlink()

# Update TTL
ttl_obj = json.loads(ttl_path.read_text())
del ttl_obj[file_name]
ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Deleted file: {file_name} from repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to delete file: {file_name} from repo: {repo_name}")
Expand All @@ -216,19 +292,56 @@ async def delete_file(repo_name: str, file_name: str):
return {"filename": file_name}

@app.command()
@fastapi_app.post("/gc")
def gc():
@fastapi_app.post("/repos/{repo_name}/clean")
def clean(repo_name: str):
"""
Perform garbage collection on all repos.
Clean up expired files in the repo.
"""
logger.info(f"Cleaning up expired files in repo: {repo_name}")

ttl_path = Path(f"/cvmfs/{repo_name}/{TTL_FILENAME}")
if not ttl_path.exists():
logger.info(f"No TTL file found in repo: {repo_name}. Skipping clean up.")
return {"message": "No TTL file found. Skipping clean up."}

cleaned = 0
errors = 0

with transaction_lock:
logger.info("Running garbage collection")
gc_start = time.perf_counter()
subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True)
gc_end = time.perf_counter()
logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s")
return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start}
# start transaction
subprocess.run(["cvmfs_server", "transaction", repo_name], check=True)

try:
ttl_obj = json.loads(ttl_path.read_text())
for file_name, ttl in ttl_obj.copy().items():
if ttl["expires_at"] < time.time():
file_path = Path(f"/cvmfs/{repo_name}/{file_name}")
if file_path.exists():
file_path.unlink()
cleaned += 1
else:
logger.warning(f"Trying to clean up non-existent file: {file_name} in repo: {repo_name}")
errors += 1
del ttl_obj[file_name]

ttl_path.write_text(json.dumps(ttl_obj))

logger.info(f"Cleaned up expired files in repo: {repo_name}")
except Exception as e:
logger.error(f"Failed to clean up expired files in repo: {repo_name}")
logger.exception(e)
# abort transaction
subprocess.run(["cvmfs_server", "abort", repo_name, "-f"], check=True)
raise HTTPException(status_code=500, detail=f"Failed to clean up expired files: {e}")

# publish transaction
subprocess.run(["cvmfs_server", "publish", repo_name], check=True)
notify(repo_name)

msg = f"Cleaned up {cleaned} expired files in repo: {repo_name}. Errors: {errors}"
logger.info(msg)

return {"message": msg}

@app.command()
@fastapi_app.post("/repos/{repo_name}/notify")
Expand Down Expand Up @@ -260,6 +373,39 @@ def notify(repo_name: str):
return {"message": f"Notified clients about changes in repo {repo_name}"}


@app.command()
@fastapi_app.post("/gc")
def gc():
"""
Perform garbage collection on all repos.
"""
with transaction_lock:
logger.info("Running garbage collection")
gc_start = time.perf_counter()
subprocess.run(["cvmfs_server", "gc", "-r", "0", "-f"], check=True)
gc_end = time.perf_counter()

logger.info(f"Garbage collection completed. Took {gc_end - gc_start:.2f}s")
return {"message": "Garbage collection completed", "gc_time_s": gc_end - gc_start}

@app.command()
@fastapi_app.post("/housekeeping")
def housekeeping():
"""
Clean all repos and perform garbage collection.
"""
logger.info("Running housekeeping")
housekeeping_start = time.perf_counter()
for repo_path in Path("/cvmfs").iterdir():
if repo_path.is_dir():
repo_name = repo_path.name
clean(repo_name)
gc()
housekeeping_end = time.perf_counter()

logger.info(f"Housekeeping completed. Took {housekeeping_end - housekeeping_start:.2f}s")
return {"message": "Housekeeping completed", "housekeeping_time_s": housekeeping_end - housekeeping_start}

@app.command()
def start_server(port: int = 81):
uvicorn.run(fastapi_app, host="0.0.0.0", port=port)
Expand Down
Loading