diff --git a/backend/internal/app/orchestrator/app.go b/backend/internal/app/orchestrator/app.go index 44930ea..03bd342 100644 --- a/backend/internal/app/orchestrator/app.go +++ b/backend/internal/app/orchestrator/app.go @@ -129,6 +129,11 @@ func (a *App) Run(ctx context.Context) error { if err != nil { log.Warn("can't check pings from agents", sl.Err(err)) } + + err = a.OrchestratorApp.FindForgottenExpressions(ctx, a.Producer) + if err != nil { + log.Warn("can't find forgotten expressions", sl.Err(err)) + } case <-ctx.Done(): log.Error("orchestrator stopped") diff --git a/backend/internal/orchestrator/orchestrator.go b/backend/internal/orchestrator/orchestrator.go index 7973e5d..640b1da 100644 --- a/backend/internal/orchestrator/orchestrator.go +++ b/backend/internal/orchestrator/orchestrator.go @@ -180,6 +180,71 @@ func (o *Orchestrator) CheckPing(ctx context.Context, producer brokers.Producer) return nil } +// FindForgottenExpressions that aren't processed by anyone. +func (o *Orchestrator) FindForgottenExpressions(ctx context.Context, producer brokers.Producer) error { + const fn = "orchestrator.FindForgottenExpressions" + + log := o.log.With( + slog.String("fn", fn), + ) + + tx, err := o.dbConfig.DB.Begin() + if err != nil { + return err + } + + qtx := o.dbConfig.Queries.WithTx(tx) + + agentIDs, err := qtx.GetBusyAgents(ctx) + if err != nil { + return err + } + + if len(agentIDs) != 0 { + errCommit := tx.Commit() + if errCommit != nil { + log.Error("can't commit transaction") + + return errCommit + } + return nil + } + + expressions, err := qtx.GetExpressionWithStatusComputing(ctx) + if err != nil { + log.Error("can't get expressions", sl.Err(err)) + errRollback := tx.Rollback() + if errRollback != nil { + log.Error("can't rollback transaction") + + return errRollback + } + return err + } + + err = tx.Commit() + if err != nil { + log.Error("can't commit transaction", sl.Err(err)) + + return err + } + + if len(expressions) == 0 { + return nil + } + + for _, expr := range expressions { + msgToQueue := messages.ExpressionMessage{ + ExpressionID: expr.ExpressionID, + Expression: expr.ParseData, + UserID: expr.UserID, + } + o.AddTask(msgToQueue, producer) + } + + return nil +} + // HandlePing accepts ping from agent. func (o *Orchestrator) HandlePing(ctx context.Context, agentID int32) error { const fn = "orchestrator.HandlePing" diff --git a/backend/internal/storage/postgres/agents.sql.go b/backend/internal/storage/postgres/agents.sql.go index 8e1af86..326b761 100644 --- a/backend/internal/storage/postgres/agents.sql.go +++ b/backend/internal/storage/postgres/agents.sql.go @@ -97,6 +97,35 @@ func (q *Queries) GetAgents(ctx context.Context) ([]Agent, error) { return items, nil } +const getBusyAgents = `-- name: GetBusyAgents :many +SELECT agent_id +FROM agents +WHERE number_of_active_calculations != 0 +` + +func (q *Queries) GetBusyAgents(ctx context.Context) ([]int32, error) { + rows, err := q.db.QueryContext(ctx, getBusyAgents) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int32 + for rows.Next() { + var agent_id int32 + if err := rows.Scan(&agent_id); err != nil { + return nil, err + } + items = append(items, agent_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const incrementNumberOfActiveCalculations = `-- name: IncrementNumberOfActiveCalculations :exec UPDATE agents SET number_of_active_calculations = number_of_active_calculations + 1 diff --git a/backend/internal/storage/postgres/expressions.sql.go b/backend/internal/storage/postgres/expressions.sql.go index 3ade510..bd42111 100644 --- a/backend/internal/storage/postgres/expressions.sql.go +++ b/backend/internal/storage/postgres/expressions.sql.go @@ -143,6 +143,50 @@ func (q *Queries) GetExpressionByID(ctx context.Context, expressionID int32) (Ex return i, err } +const getExpressionWithStatusComputing = `-- name: GetExpressionWithStatusComputing :many +SELECT + expression_id, user_id, agent_id, + created_at, updated_at, data, parse_data, + status, result, is_ready +FROM expressions +WHERE status = 'computing' +ORDER BY created_at DESC +` + +func (q *Queries) GetExpressionWithStatusComputing(ctx context.Context) ([]Expression, error) { + rows, err := q.db.QueryContext(ctx, getExpressionWithStatusComputing) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Expression + for rows.Next() { + var i Expression + if err := rows.Scan( + &i.ExpressionID, + &i.UserID, + &i.AgentID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Data, + &i.ParseData, + &i.Status, + &i.Result, + &i.IsReady, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getExpressions = `-- name: GetExpressions :many SELECT expression_id, user_id, agent_id, diff --git a/backend/sql/queries/agents.sql b/backend/sql/queries/agents.sql index a228fdb..d526c11 100644 --- a/backend/sql/queries/agents.sql +++ b/backend/sql/queries/agents.sql @@ -49,4 +49,9 @@ RETURNING agent_id; -- name: TerminateOldAgents :exec DELETE FROM agents -WHERE status = 'terminated'; \ No newline at end of file +WHERE status = 'terminated'; + +-- name: GetBusyAgents :many +SELECT agent_id +FROM agents +WHERE number_of_active_calculations != 0; \ No newline at end of file diff --git a/backend/sql/queries/expressions.sql b/backend/sql/queries/expressions.sql index 4a713ae..8024493 100644 --- a/backend/sql/queries/expressions.sql +++ b/backend/sql/queries/expressions.sql @@ -66,4 +66,13 @@ ORDER BY created_at DESC; -- name: AssignExpressionToAgent :exec UPDATE expressions SET agent_id = $1 -WHERE expression_id = $2; \ No newline at end of file +WHERE expression_id = $2; + +-- name: GetExpressionWithStatusComputing :many +SELECT + expression_id, user_id, agent_id, + created_at, updated_at, data, parse_data, + status, result, is_ready +FROM expressions +WHERE status = 'computing' +ORDER BY created_at DESC; \ No newline at end of file