Skip to content

Commit

Permalink
Refactor redis everywhere into one db.py class (#104)
Browse files Browse the repository at this point in the history
* Refactor redis eveyrwhere into one db.py class

Right now the data handling is a total fragile mess. If we want
to add any more features we need to fix it. As a first step, I've moved
all operations touching redis to a separate file (db.py)

Co-authored-by: Michał Praszmo <[email protected]>
  • Loading branch information
msm-code and nazywam authored Apr 17, 2020
1 parent f553786 commit cfc00f9
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 168 deletions.
75 changes: 19 additions & 56 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import json
import os
import random
import string
import time

import uvicorn
from datetime import datetime
Expand All @@ -16,7 +12,8 @@
from lib.ursadb import UrsaDb
from lib.yaraparse import parse_yara

from util import make_redis, mquery_version
from util import mquery_version
from db import Database, JobId
import config
from typing import Any, Callable, List, Union

Expand All @@ -37,9 +34,9 @@
BackendStatusDatasetsSchema,
)

redis = make_redis()
db = Database()
app = FastAPI()
db = UrsaDb(config.BACKEND)
ursa = UrsaDb(config.BACKEND)


@app.middleware("http")
Expand All @@ -58,9 +55,7 @@ async def add_headers(request: Request, call_next: Callable) -> Response:

@app.get("/api/download")
def download(job_id: str, ordinal: str, file_path: str) -> Any:
file_list = redis.lrange("meta:" + job_id, ordinal, ordinal)

if not file_list or file_path != json.loads(file_list[0])["file"]:
if not db.job_contains(JobId(job_id), ordinal, file_path):
raise NotFound("No such file in result set.")

attach_name, ext = os.path.splitext(os.path.basename(file_path))
Expand Down Expand Up @@ -96,63 +91,31 @@ def query(
for rule in rules
]

job_hash = "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(12)
job = db.create_search_task(
rules[-1].name,
rules[-1].author,
data.raw_yara,
data.priority,
data.taint,
)

job_obj = {
"status": "new",
"rule_name": rules[-1].name,
"rule_author": rules[-1].author,
"raw_yara": data.raw_yara,
"submitted": int(time.time()),
"priority": data.priority,
}

if data.taint is not None:
job_obj["taint"] = data.taint

redis.hmset("job:" + job_hash, job_obj)
redis.rpush("queue-search", job_hash)

return QueryResponseSchema(query_hash=job_hash)
return QueryResponseSchema(query_hash=job.hash)


@app.get("/api/matches/{hash}", response_model=MatchesSchema)
def matches(
hash: str, offset: int = Query(...), limit: int = Query(...)
) -> MatchesSchema:
p = redis.pipeline(transaction=False)
p.hgetall("job:" + hash)
p.lrange("meta:" + hash, offset, offset + limit - 1)
job, meta = p.execute()
return MatchesSchema(job=job, matches=[json.loads(m) for m in meta])


def get_job(job_id: str) -> JobSchema:
job = redis.hgetall(job_id)
return JobSchema(
id=job_id[4:],
status=job.get("status", "ERROR"),
rule_name=job.get("rule_name", "ERROR"),
rule_author=job.get("rule_author", None),
raw_yara=job.get("raw_yara", "ERROR"),
submitted=job.get("submitted", 0),
priority=job.get("priority", "medium"),
files_processed=job.get("files_processed", 0),
total_files=job.get("total_files", 0),
)
return db.get_job_matches(JobId(hash), offset, limit)


@app.get("/api/job/{job_id}", response_model=JobSchema)
def job_info(job_id: str) -> JobSchema:
return get_job(f"job:{job_id}")
return db.get_job(JobId(job_id))


@app.delete("/api/job/{job_id}", response_model=StatusSchema)
def job_cancel(job_id: str) -> StatusSchema:
redis.hmset("job:" + job_id, {"status": "cancelled"})
db.cancel_job(JobId(job_id))
return StatusSchema(status="ok")


Expand Down Expand Up @@ -202,15 +165,15 @@ def user_jobs(name: str) -> List[JobSchema]:

@app.get("/api/job", response_model=JobsSchema)
def job_statuses() -> JobsSchema:
jobs = [get_job(j) for j in redis.keys("job:*")]
jobs = [db.get_job(job) for job in db.get_job_ids()]
jobs = sorted(jobs, key=lambda j: j.submitted, reverse=True)
return JobsSchema(jobs=jobs)


@app.get("/api/backend", response_model=BackendStatusSchema)
def backend_status() -> BackendStatusSchema:
db_alive = True
status = db.status()
status = ursa.status()
try:
tasks = status.get("result", {}).get("tasks", [])
ursadb_version = status.get("result", {}).get(
Expand All @@ -236,7 +199,7 @@ def backend_status_datasets() -> BackendStatusDatasetsSchema:
db_alive = True

try:
datasets = db.topology().get("result", {}).get("datasets", {})
datasets = ursa.topology().get("result", {}).get("datasets", {})
except Again:
db_alive = False
datasets = {}
Expand All @@ -258,7 +221,7 @@ def serve_index_sub() -> FileResponse:

@app.get("/api/compactall")
def compact_all() -> StatusSchema:
redis.rpush("queue-commands", "compact all;")
db.run_command("compact all;")
return StatusSchema(status="ok")


Expand Down
Loading

0 comments on commit cfc00f9

Please sign in to comment.