Skip to content

Commit

Permalink
fix(orchestrator): slightly improve the timeout query
Browse files Browse the repository at this point in the history
simplifying slightly the where clause selecting tasks that have timeout
out from
```
CREATED AND created_timeout < NOW
OR STARTED AND heartbeat_timeout < NOW
OR STARTED AND completed_timeout < NOW
```
to
```
CREATED AND created_timeout < NOW
OR STARTED AND (heartbeat_timeout < NOW OR completed_timeout < NOW`)
```

Query plan before
```
Update on tasks  (cost=17.97..26.04 rows=1 width=91)
  ->  Nested Loop  (cost=17.97..26.04 rows=1 width=91)
        ->  HashAggregate  (cost=17.41..17.42 rows=1 width=56)
              Group Key: "ANY_subquery".id
              ->  Subquery Scan on "ANY_subquery"  (cost=13.33..17.40 rows=1 width=56)
                    ->  LockRows  (cost=13.33..17.39 rows=1 width=22)
                          ->  Bitmap Heap Scan on tasks tasks_1  (cost=13.33..17.38 rows=1 width=22)
                                Recheck Cond: ((state = 'CREATED'::nango_scheduler.task_states) OR (state = 'STARTED'::nango_scheduler.task_states) OR (state = 'STARTED'::nango_scheduler.task_states))
                                Filter: (((state = 'CREATED'::nango_scheduler.task_states) AND ((starts_after + ((created_to_started_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP)) OR ((state = 'STARTED'::nango_scheduler.task_states) AND ((last_heartbeat_at + ((heartbeat_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP)) OR ((state = 'STARTED'::nango_scheduler.task_states) AND ((last_state_transition_at + ((started_to_completed_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP)))
                                ->  BitmapOr  (cost=13.33..13.33 rows=1 width=0)
                                      ->  Bitmap Index Scan on idx_tasks_state  (cost=0.00..4.44 rows=1 width=0)
                                            Index Cond: (state = 'CREATED'::nango_scheduler.task_states)
                                      ->  Bitmap Index Scan on idx_tasks_state  (cost=0.00..4.44 rows=1 width=0)
                                            Index Cond: (state = 'STARTED'::nango_scheduler.task_states)
                                      ->  Bitmap Index Scan on idx_tasks_state  (cost=0.00..4.44 rows=1 width=0)
                                            Index Cond: (state = 'STARTED'::nango_scheduler.task_states)
        ->  Index Scan using tasks_pkey on tasks  (cost=0.56..8.58 rows=1 width=296)
              Index Cond: (id = "ANY_subquery".id)
```

Query plan after
```
Update on tasks t  (cost=13.55..21.59 rows=1 width=123)
  CTE eligible_tasks
    ->  LockRows  (cost=8.89..12.99 rows=1 width=292)
          ->  Bitmap Heap Scan on tasks  (cost=8.89..12.98 rows=1 width=292)
                Recheck Cond: ((state = 'CREATED'::nango_scheduler.task_states) OR (state = 'STARTED'::nango_scheduler.task_states))
                Filter: (((state = 'CREATED'::nango_scheduler.task_states) AND ((starts_after + ((created_to_started_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP)) OR ((state = 'STARTED'::nango_scheduler.task_states) AND (((last_heartbeat_at + ((heartbeat_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP) OR ((last_state_transition_at + ((started_to_completed_timeout_secs)::double precision * '00:00:01'::interval)) < CURRENT_TIMESTAMP))))
                ->  BitmapOr  (cost=8.89..8.89 rows=1 width=0)
                      ->  Bitmap Index Scan on idx_tasks_state  (cost=0.00..4.44 rows=1 width=0)
                            Index Cond: (state = 'CREATED'::nango_scheduler.task_states)
                      ->  Bitmap Index Scan on idx_tasks_state  (cost=0.00..4.44 rows=1 width=0)
                            Index Cond: (state = 'STARTED'::nango_scheduler.task_states)
  ->  Nested Loop  (cost=0.56..8.60 rows=1 width=123)
        ->  CTE Scan on eligible_tasks e  (cost=0.00..0.02 rows=1 width=120)
        ->  Index Scan using tasks_pkey on tasks t  (cost=0.56..8.58 rows=1 width=22)
              Index Cond: (id = e.id)
```

I switched to use knex.raw because using knex syntax isn't shorter or easier to read
I am also using a CTE instead of a IN subquery to make it slightly easier to read
  • Loading branch information
TBonnin committed Oct 3, 2024
1 parent 195e526 commit 70686f1
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions packages/scheduler/lib/models/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,41 +263,41 @@ export async function dequeue(db: knex.Knex, { groupKey, limit }: { groupKey: st

export async function expiresIfTimeout(db: knex.Knex): Promise<Result<Task[]>> {
try {
const tasks = await db
.update({
state: 'EXPIRED',
last_state_transition_at: new Date(),
terminated: true,
output: db.raw(`
const { rows: tasks } = await db.raw<{ rows: DbTask[] }>(
`
WITH eligible_tasks AS (
SELECT id, state, output,
CASE
WHEN state = 'CREATED' AND starts_after + created_to_started_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP THEN '{"reason": "createdToStartedTimeoutSecs_exceeded"}'
WHEN state = 'STARTED' AND last_heartbeat_at + heartbeat_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP THEN '{"reason": "heartbeatTimeoutSecs_exceeded"}'
WHEN state = 'STARTED' AND last_state_transition_at + started_to_completed_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP THEN '{"reason": "startedToCompletedTimeoutSecs_exceeded"}'
WHEN state = 'CREATED' AND starts_after + created_to_started_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP
THEN '{"reason": "createdToStartedTimeoutSecs_exceeded"}'::json
WHEN state = 'STARTED' AND last_heartbeat_at + heartbeat_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP
THEN '{"reason": "heartbeatTimeoutSecs_exceeded"}'::json
WHEN state = 'STARTED' AND last_state_transition_at + started_to_completed_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP
THEN '{"reason": "startedToCompletedTimeoutSecs_exceeded"}'::json
ELSE output
END
`)
})
.from<DbTask>(TASKS_TABLE)
.whereIn(
'id',
db
.select('id')
.from<DbTask>(TASKS_TABLE)
.where((builder) => {
builder
.where({ state: 'CREATED' })
.andWhere(db.raw(`starts_after + created_to_started_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP`));
builder
.orWhere({ state: 'STARTED' })
.andWhere(db.raw(`last_heartbeat_at + heartbeat_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP`));
builder
.orWhere({ state: 'STARTED' })
.andWhere(db.raw(`last_state_transition_at + started_to_completed_timeout_secs * INTERVAL '1 seconds' < CURRENT_TIMESTAMP`));
})
.forUpdate()
.skipLocked()
END AS reason
FROM ${TASKS_TABLE}
WHERE (
state = 'CREATED' AND starts_after + created_to_started_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP)
OR (
state = 'STARTED'
AND (
last_heartbeat_at + heartbeat_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP
OR last_state_transition_at + started_to_completed_timeout_secs * INTERVAL '1 second' < CURRENT_TIMESTAMP
)
)
FOR UPDATE SKIP LOCKED
)
.returning('*');
UPDATE ${TASKS_TABLE} t
SET state = 'EXPIRED',
last_state_transition_at = CURRENT_TIMESTAMP,
terminated = TRUE,
output = e.reason
FROM eligible_tasks e
WHERE t.id = e.id
RETURNING t.*;
`
);
if (!tasks?.[0]) {
return Ok([]);
}
Expand Down

0 comments on commit 70686f1

Please sign in to comment.