Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution time tracking tweaks #1994

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
PaginatedCrawlOutResponse,
User,
StorageRef,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
QARun,
UpdatedResponse,
Expand Down Expand Up @@ -272,7 +272,7 @@ async def update_crawl_state(self, crawl_id: str, state: str):
{
"_id": crawl_id,
"type": "crawl",
"state": {"$in": RUNNING_AND_STARTING_STATES},
"state": {"$in": RUNNING_AND_WAITING_STATES},
},
{"$set": data},
)
Expand Down
26 changes: 16 additions & 10 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
PaginatedCrawlOutResponse,
PaginatedSeedResponse,
PaginatedCrawlErrorResponse,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
NON_RUNNING_STATES,
ALL_CRAWL_STATES,
Expand Down Expand Up @@ -165,7 +165,7 @@ async def list_crawls(
query["userid"] = userid

if running_only:
query["state"] = {"$in": RUNNING_AND_STARTING_STATES}
query["state"] = {"$in": RUNNING_AND_WAITING_STATES}

# Override running_only if state list is explicitly passed
if state:
Expand Down Expand Up @@ -425,7 +425,7 @@ async def get_crawl_queue(

state, _ = await self.get_crawl_state(crawl_id, False)

if state not in RUNNING_AND_STARTING_STATES:
if state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

total = 0
Expand Down Expand Up @@ -463,7 +463,7 @@ async def match_crawl_queue(
limit <= next_offset < limit + step"""
state, _ = await self.get_crawl_state(crawl_id, False)

if state not in RUNNING_AND_STARTING_STATES:
if state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

total = 0
Expand Down Expand Up @@ -513,7 +513,7 @@ async def add_or_remove_exclusion(

crawl = await self.get_crawl(crawl_id, org)

if crawl.state not in RUNNING_AND_STARTING_STATES:
if crawl.state not in RUNNING_AND_WAITING_STATES:
raise HTTPException(status_code=400, detail="crawl_not_running")

cid = crawl.cid
Expand Down Expand Up @@ -591,30 +591,36 @@ async def inc_crawl_exec_time(
"qaCrawlExecSeconds": exec_time,
"qa.crawlExecSeconds": exec_time,
}
field = "qa._lut"
else:
inc_update = {"crawlExecSeconds": exec_time}
field = "_lut"

res = await self.crawls.find_one_and_update(
{
"_id": crawl_id,
"type": "crawl",
"_lut": {"$ne": last_updated_time},
field: {"$ne": last_updated_time},
},
{
"$inc": inc_update,
"$set": {"_lut": last_updated_time},
"$set": {field: last_updated_time},
},
)
return res is not None

async def get_crawl_exec_last_update_time(
self, crawl_id: str
self, crawl_id: str, is_qa: bool
) -> Optional[datetime]:
"""get crawl last updated time"""
field = "_lut" if not is_qa else "qa._lut"
res = await self.crawls.find_one(
{"_id": crawl_id, "type": "crawl"}, projection=["_lut"]
{"_id": crawl_id, "type": "crawl"}, projection=[field]
)
return res and res.get("_lut")
if not res:
return None

return res.get("qa", {}).get("_lut") if is_qa else res.get("_lut")

async def get_crawl_state(
self, crawl_id: str, is_qa: bool
Expand Down
12 changes: 6 additions & 6 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ class UserOut(BaseModel):
]
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)

TYPE_STARTING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
STARTING_STATES = get_args(TYPE_STARTING_STATES)
TYPE_WAITING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
WAITING_STATES = get_args(TYPE_WAITING_STATES)

TYPE_FAILED_STATES = Literal[
"canceled",
Expand All @@ -228,18 +228,18 @@ class UserOut(BaseModel):
]
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)

TYPE_RUNNING_AND_STARTING_STATES = Literal[TYPE_STARTING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_STARTING_STATES = [*STARTING_STATES, *RUNNING_STATES]
TYPE_RUNNING_AND_WAITING_STATES = Literal[TYPE_WAITING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_WAITING_STATES = [*WAITING_STATES, *RUNNING_STATES]

RUNNING_AND_STARTING_ONLY = ["starting", *RUNNING_STATES]

TYPE_NON_RUNNING_STATES = Literal[TYPE_FAILED_STATES, TYPE_SUCCESSFUL_STATES]
NON_RUNNING_STATES = [*FAILED_STATES, *SUCCESSFUL_STATES]

TYPE_ALL_CRAWL_STATES = Literal[
TYPE_RUNNING_AND_STARTING_STATES, TYPE_NON_RUNNING_STATES
TYPE_RUNNING_AND_WAITING_STATES, TYPE_NON_RUNNING_STATES
]
ALL_CRAWL_STATES = [*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES]
ALL_CRAWL_STATES = [*RUNNING_AND_WAITING_STATES, *NON_RUNNING_STATES]


# ============================================================================
Expand Down
53 changes: 30 additions & 23 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
TYPE_ALL_CRAWL_STATES,
NON_RUNNING_STATES,
RUNNING_STATES,
WAITING_STATES,
RUNNING_AND_STARTING_ONLY,
RUNNING_AND_STARTING_STATES,
RUNNING_AND_WAITING_STATES,
SUCCESSFUL_STATES,
FAILED_STATES,
CrawlStats,
Expand Down Expand Up @@ -119,6 +120,7 @@ async def sync_crawls(self, data: MCSyncData):
"""sync crawls"""

status = CrawlStatus(**data.parent.get("status", {}))
status.last_state = status.state

spec = data.parent.get("spec", {})
crawl_id = spec["id"]
Expand Down Expand Up @@ -250,11 +252,6 @@ async def sync_crawls(self, data: MCSyncData):

else:
status.scale = 1
now = dt_now()
await self.crawl_ops.inc_crawl_exec_time(
crawl.db_crawl_id, crawl.is_qa, 0, now
)
status.lastUpdatedTime = to_k8s_date(now)

children = self._load_redis(params, status, data.children)

Expand Down Expand Up @@ -807,25 +804,13 @@ async def sync_crawl_state(
status.resync_after = self.fast_retry_secs
return status

# if true (state is set), also run webhook
if await self.set_state(
# ensure running state is set
await self.set_state(
"running",
status,
crawl,
allowed_from=["starting", "waiting_capacity"],
):
if not crawl.qa_source_crawl_id:
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
)
)

# update lastActiveTime if crawler is running
if crawler_running:
Expand Down Expand Up @@ -967,11 +952,33 @@ async def increment_pod_exec_time(
"""inc exec time tracking"""
now = dt_now()

# don't count time crawl is not running
if status.state in WAITING_STATES:
# reset lastUpdatedTime if at least 2 consecutive updates of non-running state
if status.last_state in WAITING_STATES:
status.lastUpdatedTime = to_k8s_date(now)
return

update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time(
crawl.db_crawl_id
crawl.db_crawl_id, crawl.is_qa
)

if not update_start_time:
print("Crawl first started, webhooks called", now, crawl.id)
# call initial running webhook
if not crawl.qa_source_crawl_id:
self.run_task(
self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
)

await self.crawl_ops.inc_crawl_exec_time(
crawl.db_crawl_id, crawl.is_qa, 0, now
)
Expand Down Expand Up @@ -1414,7 +1421,7 @@ async def mark_finished(

finished = dt_now()

allowed_from = RUNNING_AND_STARTING_STATES
allowed_from = RUNNING_AND_WAITING_STATES

# if set_state returns false, already set to same status, return
if not await self.set_state(
Expand Down
3 changes: 3 additions & 0 deletions backend/btrixcloud/operator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,6 @@ class CrawlStatus(BaseModel):

# don't include in status, use by metacontroller
resync_after: Optional[int] = Field(default=None, exclude=True)

# last state
last_state: TYPE_ALL_CRAWL_STATES = Field(default="starting", exclude=True)
4 changes: 2 additions & 2 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .models import (
SUCCESSFUL_STATES,
RUNNING_STATES,
STARTING_STATES,
WAITING_STATES,
BaseCrawl,
Organization,
StorageRef,
Expand Down Expand Up @@ -890,7 +890,7 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]:
{"oid": org.id, "state": {"$in": RUNNING_STATES}}
)
workflows_queued_count = await self.crawls_db.count_documents(
{"oid": org.id, "state": {"$in": STARTING_STATES}}
{"oid": org.id, "state": {"$in": WAITING_STATES}}
)
collections_count = await self.colls_db.count_documents({"oid": org.id})
public_collections_count = await self.colls_db.count_documents(
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/operators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: crawljobs-operator
spec:
generateSelector: false
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 30 }}
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 10 }}
parentResource:
apiVersion: btrix.cloud/v1
resource: crawljobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ export class ArchivedItemDetail extends TailwindElement {
? html`<span
>${humanizeExecutionSeconds(
this.crawl!.crawlExecSeconds,
{ displaySeconds: true },
)}</span
>`
: html`<span class="text-0-400">${msg("Pending")}</span>`}
Expand Down
Loading