Skip to content

Commit

Permalink
add rebooting agents
Browse files Browse the repository at this point in the history
  • Loading branch information
Prrromanssss committed Apr 21, 2024
1 parent a24d3de commit d93195f
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 2 deletions.
5 changes: 5 additions & 0 deletions backend/internal/app/orchestrator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (a *App) Run(ctx context.Context) error {
if err != nil {
log.Warn("can't check pings from agents", sl.Err(err))
}

err = a.OrchestratorApp.FindForgottenExpressions(ctx, a.Producer)
if err != nil {
log.Warn("can't find forgotten expressions", sl.Err(err))
}
case <-ctx.Done():
log.Error("orchestrator stopped")

Expand Down
65 changes: 65 additions & 0 deletions backend/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,71 @@ func (o *Orchestrator) CheckPing(ctx context.Context, producer brokers.Producer)
return nil
}

// FindForgottenExpressions that aren't processed by anyone.
func (o *Orchestrator) FindForgottenExpressions(ctx context.Context, producer brokers.Producer) error {
const fn = "orchestrator.FindForgottenExpressions"

log := o.log.With(
slog.String("fn", fn),
)

tx, err := o.dbConfig.DB.Begin()
if err != nil {
return err
}

qtx := o.dbConfig.Queries.WithTx(tx)

agentIDs, err := qtx.GetBusyAgents(ctx)
if err != nil {
return err
}

if len(agentIDs) != 0 {
errCommit := tx.Commit()
if errCommit != nil {
log.Error("can't commit transaction")

return errCommit
}
return nil
}

expressions, err := qtx.GetExpressionWithStatusComputing(ctx)
if err != nil {
log.Error("can't get expressions", sl.Err(err))
errRollback := tx.Rollback()
if errRollback != nil {
log.Error("can't rollback transaction")

return errRollback
}
return err
}

err = tx.Commit()
if err != nil {
log.Error("can't commit transaction", sl.Err(err))

return err
}

if len(expressions) == 0 {
return nil
}

for _, expr := range expressions {
msgToQueue := messages.ExpressionMessage{
ExpressionID: expr.ExpressionID,
Expression: expr.ParseData,
UserID: expr.UserID,
}
o.AddTask(msgToQueue, producer)
}

return nil
}

// HandlePing accepts ping from agent.
func (o *Orchestrator) HandlePing(ctx context.Context, agentID int32) error {
const fn = "orchestrator.HandlePing"
Expand Down
29 changes: 29 additions & 0 deletions backend/internal/storage/postgres/agents.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions backend/internal/storage/postgres/expressions.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion backend/sql/queries/agents.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ RETURNING agent_id;

-- name: TerminateOldAgents :exec
DELETE FROM agents
WHERE status = 'terminated';
WHERE status = 'terminated';

-- name: GetBusyAgents :many
SELECT agent_id
FROM agents
WHERE number_of_active_calculations != 0;
11 changes: 10 additions & 1 deletion backend/sql/queries/expressions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,13 @@ ORDER BY created_at DESC;
-- name: AssignExpressionToAgent :exec
UPDATE expressions
SET agent_id = $1
WHERE expression_id = $2;
WHERE expression_id = $2;

-- name: GetExpressionWithStatusComputing :many
SELECT
expression_id, user_id, agent_id,
created_at, updated_at, data, parse_data,
status, result, is_ready
FROM expressions
WHERE status = 'computing'
ORDER BY created_at DESC;

0 comments on commit d93195f

Please sign in to comment.