Skip to content

Commit

Permalink
Execution time tracking tweaks (#1994)
Browse files Browse the repository at this point in the history
Tweaks to how execution time is tracked for more accuracy + excluding
waiting states:
- don't update if crawl state is in a 'waiting state' (waiting for
capacity or waiting for org limit)
- rename start states -> waiting states for clarity
- reset lastUpdatedTime if two consecutive updates of non-running state,
to ensure non-running states don't count, but also account for
occasional hiccups -- if only one update detects non-running state,
don't reset
- webhooks: move start webhook to when crawl actually starts for first
time (db lastUpdatedTime is not yet + crawl is running)
- don't set lastUpdatedTime until pods actually running
- set crawljob update interval to every 10 seconds for more accurate
execution time tracking
- frontend: show seconds in 'Execution Time' display
  • Loading branch information
ikreymer authored Aug 6, 2024
1 parent ec29928 commit 7fa2b61
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 44 deletions.
4 changes: 2 additions & 2 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
PaginatedCrawlOutResponse,
User,
StorageRef,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
QARun,
UpdatedResponse,
Expand Down Expand Up @@ -272,7 +272,7 @@ async def update_crawl_state(self, crawl_id: str, state: str):
{
"_id": crawl_id,
"type": "crawl",
"state": {"$in": RUNNING_AND_STARTING_STATES},
"state": {"$in": RUNNING_AND_WAITING_STATES},
},
{"$set": data},
)
Expand Down
26 changes: 16 additions & 10 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
PaginatedCrawlOutResponse,
PaginatedSeedResponse,
PaginatedCrawlErrorResponse,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
NON_RUNNING_STATES,
ALL_CRAWL_STATES,
Expand Down Expand Up @@ -165,7 +165,7 @@ async def list_crawls(
query["userid"] = userid

if running_only:
query["state"] = {"$in": RUNNING_AND_STARTING_STATES}
query["state"] = {"$in": RUNNING_AND_WAITING_STATES}

# Override running_only if state list is explicitly passed
if state:
Expand Down Expand Up @@ -425,7 +425,7 @@ async def get_crawl_queue(

state, _ = await self.get_crawl_state(crawl_id, False)

if state not in RUNNING_AND_STARTING_STATES:
if state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

total = 0
Expand Down Expand Up @@ -463,7 +463,7 @@ async def match_crawl_queue(
limit <= next_offset < limit + step"""
state, _ = await self.get_crawl_state(crawl_id, False)

if state not in RUNNING_AND_STARTING_STATES:
if state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

total = 0
Expand Down Expand Up @@ -513,7 +513,7 @@ async def add_or_remove_exclusion(

crawl = await self.get_crawl(crawl_id, org)

if crawl.state not in RUNNING_AND_STARTING_STATES:
if crawl.state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

cid = crawl.cid
Expand Down Expand Up @@ -591,30 +591,36 @@ async def inc_crawl_exec_time(
"qaCrawlExecSeconds": exec_time,
"qa.crawlExecSeconds": exec_time,
}
field = "qa._lut"
else:
inc_update = {"crawlExecSeconds": exec_time}
field = "_lut"

res = await self.crawls.find_one_and_update(
{
"_id": crawl_id,
"type": "crawl",
"_lut": {"$ne": last_updated_time},
field: {"$ne": last_updated_time},
},
{
"$inc": inc_update,
"$set": {"_lut": last_updated_time},
"$set": {field: last_updated_time},
},
)
return res is not None

async def get_crawl_exec_last_update_time(
self, crawl_id: str
self, crawl_id: str, is_qa: bool
) -> Optional[datetime]:
"""get crawl last updated time"""
field = "_lut" if not is_qa else "qa._lut"
res = await self.crawls.find_one(
{"_id": crawl_id, "type": "crawl"}, projection=["_lut"]
{"_id": crawl_id, "type": "crawl"}, projection=[field]
)
return res and res.get("_lut")
if not res:
return None

return res.get("qa", {}).get("_lut") if is_qa else res.get("_lut")

async def get_crawl_state(
self, crawl_id: str, is_qa: bool
Expand Down
12 changes: 6 additions & 6 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ class UserOut(BaseModel):
]
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)

TYPE_STARTING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
STARTING_STATES = get_args(TYPE_STARTING_STATES)
TYPE_WAITING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
WAITING_STATES = get_args(TYPE_WAITING_STATES)

TYPE_FAILED_STATES = Literal[
"canceled",
Expand All @@ -228,18 +228,18 @@ class UserOut(BaseModel):
]
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)

TYPE_RUNNING_AND_STARTING_STATES = Literal[TYPE_STARTING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_STARTING_STATES = [*STARTING_STATES, *RUNNING_STATES]
TYPE_RUNNING_AND_WAITING_STATES = Literal[TYPE_WAITING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_WAITING_STATES = [*WAITING_STATES, *RUNNING_STATES]

RUNNING_AND_STARTING_ONLY = ["starting", *RUNNING_STATES]

TYPE_NON_RUNNING_STATES = Literal[TYPE_FAILED_STATES, TYPE_SUCCESSFUL_STATES]
NON_RUNNING_STATES = [*FAILED_STATES, *SUCCESSFUL_STATES]

TYPE_ALL_CRAWL_STATES = Literal[
TYPE_RUNNING_AND_STARTING_STATES, TYPE_NON_RUNNING_STATES
TYPE_RUNNING_AND_WAITING_STATES, TYPE_NON_RUNNING_STATES
]
ALL_CRAWL_STATES = [*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES]
ALL_CRAWL_STATES = [*RUNNING_AND_WAITING_STATES, *NON_RUNNING_STATES]


# ============================================================================
Expand Down
53 changes: 30 additions & 23 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
TYPE_ALL_CRAWL_STATES,
NON_RUNNING_STATES,
RUNNING_STATES,
WAITING_STATES,
RUNNING_AND_STARTING_ONLY,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
FAILED_STATES,
CrawlStats,
Expand Down Expand Up @@ -119,6 +120,7 @@ async def sync_crawls(self, data: MCSyncData):
"""sync crawls"""

status = CrawlStatus(**data.parent.get("status", {}))
status.last_state = status.state

spec = data.parent.get("spec", {})
crawl_id = spec["id"]
Expand Down Expand Up @@ -250,11 +252,6 @@ async def sync_crawls(self, data: MCSyncData):

else:
status.scale = 1
now = dt_now()
await self.crawl_ops.inc_crawl_exec_time(
crawl.db_crawl_id, crawl.is_qa, 0, now
)
status.lastUpdatedTime = to_k8s_date(now)

children = self._load_redis(params, status, data.children)

Expand Down Expand Up @@ -807,25 +804,13 @@ async def sync_crawl_state(
status.resync_after = self.fast_retry_secs
return status

# if true (state is set), also run webhook
if await self.set_state(
# ensure running state is set
await self.set_state(
"running",
status,
crawl,
allowed_from=["starting", "waiting_capacity"],
):
if not crawl.qa_source_crawl_id:
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
)
)

# update lastActiveTime if crawler is running
if crawler_running:
Expand Down Expand Up @@ -967,11 +952,33 @@ async def increment_pod_exec_time(
"""inc exec time tracking"""
now = dt_now()

# don't count time crawl is not running
if status.state in WAITING_STATES:
# reset lastUpdatedTime if at least 2 consecutive updates of non-running state
if status.last_state in WAITING_STATES:
status.lastUpdatedTime = to_k8s_date(now)
return

update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time(
crawl.db_crawl_id
crawl.db_crawl_id, crawl.is_qa
)

if not update_start_time:
print("Crawl first started, webhooks called", now, crawl.id)
# call initial running webhook
if not crawl.qa_source_crawl_id:
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
)

await self.crawl_ops.inc_crawl_exec_time(
crawl.db_crawl_id, crawl.is_qa, 0, now
)
Expand Down Expand Up @@ -1414,7 +1421,7 @@ async def mark_finished(

finished = dt_now()

allowed_from = RUNNING_AND_STARTING_STATES
allowed_from = RUNNING_AND_WAITING_STATES

# if set_state returns false, already set to same status, return
if not await self.set_state(
Expand Down
3 changes: 3 additions & 0 deletions backend/btrixcloud/operator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,6 @@ class CrawlStatus(BaseModel):

# don't include in status, use by metacontroller
resync_after: Optional[int] = Field(default=None, exclude=True)

# last state
last_state: TYPE_ALL_CRAWL_STATES = Field(default="starting", exclude=True)
4 changes: 2 additions & 2 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .models import (
SUCCESSFUL_STATES,
RUNNING_STATES,
STARTING_STATES,
WAITING_STATES,
BaseCrawl,
Organization,
StorageRef,
Expand Down Expand Up @@ -890,7 +890,7 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]:
{"oid": org.id, "state": {"$in": RUNNING_STATES}}
)
workflows_queued_count = await self.crawls_db.count_documents(
{"oid": org.id, "state": {"$in": STARTING_STATES}}
{"oid": org.id, "state": {"$in": WAITING_STATES}}
)
collections_count = await self.colls_db.count_documents({"oid": org.id})
public_collections_count = await self.colls_db.count_documents(
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/operators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: crawljobs-operator
spec:
generateSelector: false
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 30 }}
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 10 }}
parentResource:
apiVersion: btrix.cloud/v1
resource: crawljobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ export class ArchivedItemDetail extends TailwindElement {
? html`<span
>${humanizeExecutionSeconds(
this.crawl!.crawlExecSeconds,
{ displaySeconds: true },
)}</span
>`
: html`<span class="text-0-400">${msg("Pending")}</span>`}
Expand Down

0 comments on commit 7fa2b61

Please sign in to comment.