From d4f47e5482436f881cbabe0c415ce564c4e53d0f Mon Sep 17 00:00:00 2001 From: Sourr_cream Date: Thu, 18 Apr 2024 00:36:39 +0300 Subject: [PATCH] reorganize structure --- backend/cmd/agent/main.go | 106 ++------ backend/cmd/agent_agregator/main.go | 108 -------- backend/cmd/orchestrator/main.go | 103 ++----- backend/config/local.yaml | 3 +- backend/internal/agent/agent.go | 26 +- .../agent_agregator/agent_agregator.go | 257 ------------------ backend/internal/app/agent/app.go | 117 ++++++++ backend/internal/app/orchestrator/app.go | 107 ++++++++ backend/internal/config/config.go | 11 +- .../domain/messages/orchestrator_message.go | 6 - .../http-server/handlers/expression.go | 10 +- backend/internal/lib/logger/setup/logger.go | 40 +++ .../lib/logger/setup/pretty_logger.go | 20 ++ backend/internal/orchestrator/orchestrator.go | 254 +++++++++++++++++ 14 files changed, 618 insertions(+), 550 deletions(-) delete mode 100644 backend/cmd/agent_agregator/main.go delete mode 100644 backend/internal/agent_agregator/agent_agregator.go create mode 100644 backend/internal/app/agent/app.go create mode 100644 backend/internal/app/orchestrator/app.go delete mode 100644 backend/internal/domain/messages/orchestrator_message.go create mode 100644 backend/internal/lib/logger/setup/logger.go create mode 100644 backend/internal/lib/logger/setup/pretty_logger.go diff --git a/backend/cmd/agent/main.go b/backend/cmd/agent/main.go index ad205e8..9259da2 100644 --- a/backend/cmd/agent/main.go +++ b/backend/cmd/agent/main.go @@ -1,101 +1,43 @@ -package agentservice +package main import ( "context" "log/slog" "time" - ag "github.com/Prrromanssss/DAEE-fullstack/internal/agent" + agentapp "github.com/Prrromanssss/DAEE-fullstack/internal/app/agent" "github.com/Prrromanssss/DAEE-fullstack/internal/config" - "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" - "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" - "github.com/Prrromanssss/DAEE-fullstack/internal/rabbitmq" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/logcleaner" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/setup" "github.com/Prrromanssss/DAEE-fullstack/internal/storage" - "github.com/Prrromanssss/DAEE-fullstack/internal/storage/postgres" ) -// RunAgent makes all requirements needed to run agent and call AgentService to run it. -func RunAgent(log *slog.Logger, cfg *config.Config, dbCfg *storage.Storage) { - const fn = "agentservice.RunAgent" - - log = log.With( - slog.String("fn", fn), - ) - +func main() { ctxWithCancel, cancel := context.WithCancel(context.Background()) + defer cancel() - amqpCfg, err := rabbitmq.NewAMQPConfig(log, cfg.RabbitMQURL) - if err != nil { - cancel() - log.Error("can't create NewAMQPConfig", sl.Err(err)) - return - } - - producer, err := rabbitmq.NewAMQPProducer(log, amqpCfg, cfg.QueueForResultsFromAgents) - if err != nil { - cancel() - log.Error("can't create NewAMQPProducer", sl.Err(err)) - return - } - - consumer, err := rabbitmq.NewAMQPConsumer(log, amqpCfg, cfg.QueueForExpressionsToAgents) - if err != nil { - cancel() - log.Error("can't create NewAMQPConsumer", sl.Err(err)) - return - } + // Load Config + cfg := config.MustLoad() - agent, err := ag.NewAgent( - log, - dbCfg, - postgres.Agent{}, - 200, - cancel, + // Configuration Logger + log := setup.SetupLogger(cfg.Env, cfg.LogPathAgent) + log.Info( + "start agent", + slog.String("env", cfg.Env), + slog.String("version", "2"), ) - if err != nil { - cancel() - log.Error("can't create agent", sl.Err(err)) - return - } + log.Debug("debug messages are enabled") - go AgentService(ctxWithCancel, log, amqpCfg, producer, consumer, agent) -} + go logcleaner.CleanLog(10*time.Minute, cfg.LogPathAgent, 100) -// AgentService gets messages from SimpleComputers, handle these messages, -// sends pings to Agent Agregator. -func AgentService( - ctx context.Context, - log *slog.Logger, - amqpCfg *rabbitmq.AMQPConfig, - producer brokers.Producer, - consumer brokers.Consumer, - agent *ag.Agent, -) { - defer func() { - amqpCfg.Close() - producer.Close() - consumer.Close() - agent.MakeExpressionsTerminated(ctx) - }() + // Configuration Storage + dbCfg := storage.NewStorage(cfg.StorageURL) - go func() { - for msgFromAgentAgregator := range consumer.GetMessages() { - go agent.ConsumeMessageFromAgentAgregator(ctx, msgFromAgentAgregator) - } - }() - - ticker := time.NewTicker(time.Duration(agent.InactiveTime) * time.Second) - defer ticker.Stop() - - for { - select { - case result := <-agent.SimpleComputers: - go agent.ConsumeMessageFromComputers(ctx, result, producer) - case <-ctx.Done(): - agent.Terminate() - return - case <-ticker.C: - agent.Ping(producer) - } + // Configuration Agent + application, err := agentapp.New(log, cfg, dbCfg, cancel) + if err != nil { + panic(err) } + + go application.MustRun(ctxWithCancel) } diff --git a/backend/cmd/agent_agregator/main.go b/backend/cmd/agent_agregator/main.go deleted file mode 100644 index 03c20d8..0000000 --- a/backend/cmd/agent_agregator/main.go +++ /dev/null @@ -1,108 +0,0 @@ -package agentagregatorservice - -import ( - "context" - "log/slog" - - "github.com/Prrromanssss/DAEE-fullstack/internal/config" - "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" - "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" - "github.com/Prrromanssss/DAEE-fullstack/internal/storage" - - agentagregator "github.com/Prrromanssss/DAEE-fullstack/internal/agent_agregator" - "github.com/Prrromanssss/DAEE-fullstack/internal/rabbitmq" -) - -// RunAgentAgregator makes all requirements needed to run agent and call AgregateAgents to run it. -func RunAgentAgregator( - log *slog.Logger, - cfg *config.Config, - dbCfg *storage.Storage, -) (*agentagregator.AgentAgregator, error) { - const fn = "agentagregatorservice.RunAgentAgregator" - - log = log.With( - slog.String("fn", fn), - ) - - ctxWithCancel, cancel := context.WithCancel(context.Background()) - - amqpCfg, err := rabbitmq.NewAMQPConfig(log, cfg.RabbitMQURL) - if err != nil { - cancel() - log.Error("can't create NewAMQPConfig", sl.Err(err)) - return nil, err - } - - producer, err := rabbitmq.NewAMQPProducer(log, amqpCfg, cfg.QueueForExpressionsToAgents) - if err != nil { - cancel() - log.Error("can't create NewAMQPProducer", sl.Err(err)) - return nil, err - } - - consumer, err := rabbitmq.NewAMQPConsumer(log, amqpCfg, cfg.QueueForResultsFromAgents) - if err != nil { - cancel() - log.Error("can't create NewAMQPConsumer", sl.Err(err)) - return nil, err - } - - agentAgregator, err := agentagregator.NewAgentAgregator( - log, - dbCfg, - cancel, - ) - if err != nil { - cancel() - log.Error("agent agregator error", sl.Err(err)) - return nil, err - } - - err = agentAgregator.ReloadComputingExpressions(ctxWithCancel) - if err != nil { - cancel() - log.Error("can't reload computing expressions", sl.Err(err)) - return nil, err - } - - go AgregateAgents(ctxWithCancel, log, amqpCfg, producer, consumer, agentAgregator) - - return agentAgregator, nil -} - -// AgregateAgents agregates agents, -// consumes messages from them and orchestrator, manages their job. -func AgregateAgents( - ctx context.Context, - log *slog.Logger, - amqpCfg *rabbitmq.AMQPConfig, - producer brokers.Producer, - consumer brokers.Consumer, - agentAg *agentagregator.AgentAgregator, -) { - defer func() { - amqpCfg.Close() - producer.Close() - consumer.Close() - }() - - go func() { - for msgFromAgents := range consumer.GetMessages() { - go agentAg.ConsumeMessagesFromAgents(ctx, msgFromAgents, producer) - } - }() - - for { - select { - case expressionMessage := <-agentAg.Tasks: - go agentAg.ConsumeMessagesFromOrchestrator(expressionMessage, producer) - case <-ctx.Done(): - log.Error("agent agregator stopped") - - // os.Exit(1) - return - } - - } -} diff --git a/backend/cmd/orchestrator/main.go b/backend/cmd/orchestrator/main.go index 4f27008..500d4ff 100644 --- a/backend/cmd/orchestrator/main.go +++ b/backend/cmd/orchestrator/main.go @@ -4,16 +4,14 @@ import ( "context" "log/slog" "net/http" - "os" "time" - agentservice "github.com/Prrromanssss/DAEE-fullstack/cmd/agent" - agentagregatorservice "github.com/Prrromanssss/DAEE-fullstack/cmd/agent_agregator" + orchestratorapp "github.com/Prrromanssss/DAEE-fullstack/internal/app/orchestrator" "github.com/Prrromanssss/DAEE-fullstack/internal/config" "github.com/Prrromanssss/DAEE-fullstack/internal/http-server/handlers" - "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/handlers/slogpretty" "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/logcleaner" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/setup" "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" "github.com/Prrromanssss/DAEE-fullstack/internal/storage" @@ -21,54 +19,36 @@ import ( "github.com/go-chi/cors" ) -const ( - envLocal = "local" - envDev = "dev" - envProd = "prod" -) - func main() { - // Load config + ctxWithCancel, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Load Config cfg := config.MustLoad() - // Configuration logger - log := setupLogger(cfg.Env, cfg.LogPath) + // Configuration Logger + log := setup.SetupLogger(cfg.Env, cfg.LogPathOrchestrator) log.Info( - "start daee", + "start orchestrator", slog.String("env", cfg.Env), slog.String("version", "2"), ) log.Debug("debug messages are enabled") - go logcleaner.CleanLog(10*time.Minute, cfg.LogPath, 100) - - // Configuration storage + go logcleaner.CleanLog(10*time.Minute, cfg.LogPathOrchestrator, 100) + // Configuration Storage dbCfg := storage.NewStorage(cfg.StorageURL) - // Configuration AgentAgregator - agentAgr, err := agentagregatorservice.RunAgentAgregator(log, cfg, dbCfg) + // Configuration Orchestrator + application, err := orchestratorapp.New(log, cfg, dbCfg, cancel) if err != nil { - log.Error("can't make agent agregator", sl.Err(err)) - return + panic(err) } - // Delete previous agents - err = dbCfg.DB.DeleteAgents(context.Background()) - if err != nil { - log.Error("can't delete previous agents", sl.Err(err)) - } + go application.MustRun(ctxWithCancel) - // Create Agent1 - agentservice.RunAgent(log, cfg, dbCfg) - - // Create Agent2 - agentservice.RunAgent(log, cfg, dbCfg) - - // Create Agent3 - agentservice.RunAgent(log, cfg, dbCfg) - - // Configuration http-server + // Configuration HTTP-Server router := chi.NewRouter() router.Use(cors.Handler(cors.Options{ @@ -82,13 +62,16 @@ func main() { v1Router := chi.NewRouter() - // TODO: Get rid of agentAgr in endpoint!!!!!!!! - // Expression endpoints - v1Router.Post("/expressions", handlers.HandlerCreateExpression(log, dbCfg, agentAgr)) // <<<<----- + v1Router.Post("/expressions", handlers.HandlerCreateExpression( + log, + dbCfg, + application.OrchestratorApp, + application.Producer, + )) v1Router.Get("/expressions", handlers.HandlerGetExpressions(log, dbCfg)) - // Opeartsion endpoints + // Operation endpoints v1Router.Get("/operations", handlers.HandlerGetOperations(log, dbCfg)) v1Router.Patch("/operations", handlers.HandlerUpdateOperation(log, dbCfg)) @@ -112,43 +95,3 @@ func main() { log.Info("server stopped") } - -func setupLogger(env, logPath string) *slog.Logger { - var log *slog.Logger - - logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - panic("failed to open log file: " + err.Error()) - } - defer logFile.Close() - - switch env { - case envLocal: - log = setupPrettySlog(logFile) - case envDev: - log = slog.New( - slog.NewJSONHandler(logFile, &slog.HandlerOptions{ - Level: slog.LevelDebug, - }), - ) - case envProd: - log = slog.New( - slog.NewJSONHandler(logFile, &slog.HandlerOptions{ - Level: slog.LevelInfo, - }), - ) - } - return log -} - -func setupPrettySlog(logFile *os.File) *slog.Logger { - opts := slogpretty.PrettyHandlerOptions{ - SlogOpts: &slog.HandlerOptions{ - Level: slog.LevelDebug, - }, - } - - handler := opts.NewPrettyHandler(logFile) - - return slog.New(handler) -} diff --git a/backend/config/local.yaml b/backend/config/local.yaml index e492043..6aa86cf 100644 --- a/backend/config/local.yaml +++ b/backend/config/local.yaml @@ -1,5 +1,6 @@ env: "local" -log_path: "./backend/daee.log" +log_path_agent: "./backend/agent.log" +log_path_orchestrator: "./backend/orchestrator.log" rabbit_queue: rabbitmq_url: "amqp://guest:guest@localhost:5672/" queue_for_expressions_to_agents: "Expressions to agents" diff --git a/backend/internal/agent/agent.go b/backend/internal/agent/agent.go index e840b59..b2d3147 100644 --- a/backend/internal/agent/agent.go +++ b/backend/internal/agent/agent.go @@ -50,6 +50,18 @@ func NewAgent( }, nil } +// DeletePreviousAgents deletes previous agents when the application is restarted. +func (a *Agent) DeletePreviousAgents(ctx context.Context) error { + err := a.dbConfig.DB.DeleteAgents(ctx) + if err != nil { + a.log.Error("can't delete previous agents", sl.Err(err)) + + return err + } + + return nil +} + // GetSafelyNumberOfActiveCalculations gets NumberOfActiveCalculations with Lock. func (a *Agent) GetSafelyNumberOfActiveCalculations() int32 { a.mu.Lock() @@ -266,25 +278,25 @@ func (a *Agent) ConsumeMessageFromComputers(ctx context.Context, result *message } } -// ConsumeMessageFromAgentAgregator hanldes message from Consumer. -func (a *Agent) ConsumeMessageFromAgentAgregator(ctx context.Context, msgFromAgentAgregator amqp.Delivery) { - const fn = "agent.ConsumeMessageFromAgentAgregator" +// ConsumeMessageFromOrchestrator hanldes message from Consumer. +func (a *Agent) ConsumeMessageFromOrchestrator(ctx context.Context, msgFromOrchestrator amqp.Delivery) { + const fn = "agent.ConsumeMessageFromOrchestrator" log := a.log.With( slog.String("fn", fn), ) - log.Info("agent consumes msg from agent agregator", slog.String("message", string(msgFromAgentAgregator.Body))) + log.Info("agent consumes msg from agent agregator", slog.String("message", string(msgFromOrchestrator.Body))) var exprMsg messages.ExpressionMessage - if err := json.Unmarshal(msgFromAgentAgregator.Body, &exprMsg); err != nil { + if err := json.Unmarshal(msgFromOrchestrator.Body, &exprMsg); err != nil { log.Error("agent error: failed to parse JSON", sl.Err(err)) a.kill() return } if a.GetSafelyNumberOfActiveCalculations() >= a.GetSafelyNumberOfParallelCalculations() { - err := msgFromAgentAgregator.Nack(false, true) + err := msgFromOrchestrator.Nack(false, true) if err != nil { log.Error("agent error", sl.Err(err)) a.kill() @@ -293,7 +305,7 @@ func (a *Agent) ConsumeMessageFromAgentAgregator(ctx context.Context, msgFromAge return } - err := msgFromAgentAgregator.Ack(false) + err := msgFromOrchestrator.Ack(false) if err != nil { log.Error("agent error: error acknowledging message", sl.Err(err)) a.kill() diff --git a/backend/internal/agent_agregator/agent_agregator.go b/backend/internal/agent_agregator/agent_agregator.go deleted file mode 100644 index 154ad19..0000000 --- a/backend/internal/agent_agregator/agent_agregator.go +++ /dev/null @@ -1,257 +0,0 @@ -package agentagregator - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "strconv" - "sync" - "time" - - "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" - "github.com/Prrromanssss/DAEE-fullstack/internal/domain/messages" - "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" - "github.com/Prrromanssss/DAEE-fullstack/internal/orchestrator/parser" - "github.com/Prrromanssss/DAEE-fullstack/internal/storage" - "github.com/Prrromanssss/DAEE-fullstack/internal/storage/postgres" - - "github.com/streadway/amqp" -) - -type AgentAgregator struct { - log *slog.Logger - dbConfig *storage.Storage - Tasks chan messages.MessageFromOrchestrator - mu *sync.Mutex - kill context.CancelFunc -} - -// NewAgentAgregator creates new AgentAgregator. -func NewAgentAgregator( - log *slog.Logger, - dbCfg *storage.Storage, - kill context.CancelFunc, -) (*AgentAgregator, error) { - - return &AgentAgregator{ - log: log, - dbConfig: dbCfg, - Tasks: make(chan messages.MessageFromOrchestrator), - mu: &sync.Mutex{}, - kill: kill, - }, nil -} - -// AddTask adds task to AgentAgregator. -func (aa *AgentAgregator) AddTask(expressionMessage messages.MessageFromOrchestrator) { - aa.Tasks <- expressionMessage -} - -// ReloadComputingExpressions add not completed expressions to AgentAgregator. -func (aa *AgentAgregator) ReloadComputingExpressions( - ctx context.Context, -) error { - const fn = "agentagregator.ReloadComputingExpressions" - - expressions, err := aa.dbConfig.DB.GetComputingExpressions(ctx) - if err != nil { - return fmt.Errorf("orhestrator Error: %v, fn: %s", err, fn) - } - - for _, expr := range expressions { - msgToQueue := messages.MessageFromOrchestrator{ - ExpressionID: expr.ExpressionID, - Expression: expr.ParseData, - } - aa.AddTask(msgToQueue) - } - - return nil -} - -// HandlePing accepts ping from agent. -func (aa *AgentAgregator) HandlePing(ctx context.Context, agentID int32) error { - const fn = "agentagregator.HandlePing" - - err := aa.dbConfig.DB.UpdateAgentLastPing( - ctx, - postgres.UpdateAgentLastPingParams{ - AgentID: agentID, - LastPing: time.Now().UTC(), - }) - if err != nil { - return fmt.Errorf("can't update last ping: %v, fn: %s", err, fn) - } - return nil -} - -// HandleExpressionFromAgents makes expressions ready or publishes it again to queue. -func (aa *AgentAgregator) HandleExpression( - ctx context.Context, - exprMsg messages.ExpressionMessage, - producer brokers.Producer, -) error { - const fn = "agentagregator.HandleExpressionFromAgents" - - newResultAndToken, err := aa.UpdateExpressionFromAgents(ctx, exprMsg) - if err != nil { - return fmt.Errorf("agent agregator error: %v, fn: %s", err, fn) - } - - result, err := strconv.Atoi(newResultAndToken.Result) - - if err == nil && - parser.IsNumber(newResultAndToken.Result) || - (newResultAndToken.Result[0] == '-' && parser.IsNumber(newResultAndToken.Result[1:])) { - err := aa.UpdateExpressionToReady(ctx, result, exprMsg.ExpressionID) - if err != nil { - return fmt.Errorf("agent agregator error: %v, fn: %s", err, fn) - } - - return nil - } - if newResultAndToken.Token != "" { - err := producer.PublishExpressionMessage(&messages.ExpressionMessage{ - ExpressionID: exprMsg.ExpressionID, - Token: newResultAndToken.Token, - Expression: newResultAndToken.Result, - }) - if err != nil { - return fmt.Errorf("agent agregator error: %v, fn: %s", err, fn) - } - } - - return nil -} - -// UpdateExpressionFromAgents consumes updated messages and update appropiate fields in database. -func (aa *AgentAgregator) UpdateExpressionFromAgents( - ctx context.Context, - exprMsg messages.ExpressionMessage, -) (messages.ResultAndTokenMessage, error) { - const fn = "agentagregator.UpdateExpressionFromAgents" - - expression, err := aa.dbConfig.DB.GetExpressionByID( - ctx, - exprMsg.ExpressionID, - ) - if err != nil { - return messages.ResultAndTokenMessage{}, - fmt.Errorf("can't get expression by id: %v, fn: %s", err, fn) - } - - resAndTokenMsg, err := parser.InsertResultToToken( - expression.ParseData, - exprMsg.Token, - exprMsg.Result, - ) - if err != nil { - return messages.ResultAndTokenMessage{}, - fmt.Errorf("can't insert tokens to expression: %v, fn: %s", err, fn) - } - - err = aa.dbConfig.DB.UpdateExpressionParseData( - ctx, - postgres.UpdateExpressionParseDataParams{ - ExpressionID: exprMsg.ExpressionID, - ParseData: resAndTokenMsg.Result, - }) - if err != nil { - return messages.ResultAndTokenMessage{}, - fmt.Errorf("can't update expression data: %v, fn: %s", err, fn) - } - - return resAndTokenMsg, nil -} - -// UpdateExpressionToReady updates expression to ready. -func (aa *AgentAgregator) UpdateExpressionToReady( - ctx context.Context, - result int, - exprID int32, -) error { - const fn = "agentagregator.UpdateExpressionToReady" - - err := aa.dbConfig.DB.MakeExpressionReady( - ctx, - postgres.MakeExpressionReadyParams{ - ParseData: "", - Result: int32(result), - UpdatedAt: time.Now().UTC(), - ExpressionID: exprID, - }) - if err != nil { - return fmt.Errorf("can't make expression ready: %v, fn: %s", err, fn) - } - - return nil -} - -// ConsumeMessagesFromAgents consumes message from agents. -// If it is ping handle it with HandlePing method. -// If it is expression handle it with HandleExpression method. -func (aa *AgentAgregator) ConsumeMessagesFromAgents( - ctx context.Context, - msgFromAgents amqp.Delivery, - producer brokers.Producer, -) { - const fn = "agentagregator.ConsumeMessagesFromAgents" - - log := aa.log.With( - slog.String("fn", fn), - ) - - log.Info("agent agregator consumes message from agent", slog.String("msg", string(msgFromAgents.Body))) - - err := msgFromAgents.Ack(false) - if err != nil { - log.Error("error acknowledging message", sl.Err(err)) - aa.kill() - } - - var exprMsg messages.ExpressionMessage - if err := json.Unmarshal(msgFromAgents.Body, &exprMsg); err != nil { - log.Error("failed to parse JSON", sl.Err(err)) - aa.kill() - } - - if exprMsg.IsPing { - err := aa.HandlePing(ctx, exprMsg.AgentID) - if err != nil { - log.Error("agent agregator error", sl.Err(err)) - aa.kill() - } - } else { - err := aa.HandleExpression(ctx, exprMsg, producer) - if err != nil { - log.Error("", sl.Err(err)) - aa.kill() - } - } -} - -// ConsumeMessagesFromOrchestrator consumes message from orchestrator, -// get tokens from this message, -// publishing it to queue. -func (aa *AgentAgregator) ConsumeMessagesFromOrchestrator( - expressionMessage messages.MessageFromOrchestrator, - producer brokers.Producer, -) { - const fn = "agentagregator.ConsumeMessagesFromOrchestrator" - - aa.log.Info("agent agregator consumes message from orchestrator") - - tokens := parser.GetTokens(expressionMessage.Expression) - for _, token := range tokens { - err := producer.PublishExpressionMessage(&messages.ExpressionMessage{ - ExpressionID: expressionMessage.ExpressionID, - Token: token, - Expression: expressionMessage.Expression, - }) - if err != nil { - aa.log.Error("agent agregator error", sl.Err(err), slog.String("fn", fn)) - aa.kill() - } - } -} diff --git a/backend/internal/app/agent/app.go b/backend/internal/app/agent/app.go new file mode 100644 index 0000000..41484b4 --- /dev/null +++ b/backend/internal/app/agent/app.go @@ -0,0 +1,117 @@ +package agentapp + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/Prrromanssss/DAEE-fullstack/internal/agent" + "github.com/Prrromanssss/DAEE-fullstack/internal/config" + "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" + "github.com/Prrromanssss/DAEE-fullstack/internal/rabbitmq" + "github.com/Prrromanssss/DAEE-fullstack/internal/storage" + "github.com/Prrromanssss/DAEE-fullstack/internal/storage/postgres" +) + +type App struct { + log *slog.Logger + AgentApp *agent.Agent + amqpConfig rabbitmq.AMQPConfig + Producer brokers.Producer + Consumer brokers.Consumer +} + +// MustRun runs Agent and panics if any error occurs. +func (a *App) MustRun(ctx context.Context) { + if err := a.Run(ctx); err != nil { + panic(err) + } +} + +// New creates new Agent app. +func New( + log *slog.Logger, + cfg *config.Config, + dbCfg *storage.Storage, + cancel context.CancelFunc, +) (*App, error) { + amqpCfg, err := rabbitmq.NewAMQPConfig(log, cfg.RabbitMQURL) + if err != nil { + log.Error("can't create NewAMQPConfig", sl.Err(err)) + return nil, err + } + + producer, err := rabbitmq.NewAMQPProducer(log, amqpCfg, cfg.QueueForResultsFromAgents) + if err != nil { + log.Error("can't create NewAMQPProducer", sl.Err(err)) + return nil, err + } + + consumer, err := rabbitmq.NewAMQPConsumer(log, amqpCfg, cfg.QueueForExpressionsToAgents) + if err != nil { + log.Error("can't create NewAMQPConsumer", sl.Err(err)) + return nil, err + } + + ag, err := agent.NewAgent( + log, + dbCfg, + postgres.Agent{}, + 200, + cancel, + ) + if err != nil { + log.Error("can't create agent", sl.Err(err)) + return nil, err + } + + return &App{ + log: log, + AgentApp: ag, + amqpConfig: *amqpCfg, + Producer: producer, + Consumer: consumer, + }, nil +} + +// Run gets messages from SimpleComputers, handle these messages, +// sends pings to Agent Agregator. +func (a *App) Run(ctx context.Context) error { + defer func() { + a.amqpConfig.Close() + a.Producer.Close() + a.Consumer.Close() + a.AgentApp.MakeExpressionsTerminated(ctx) + }() + + // Delete previous agents + err := a.AgentApp.DeletePreviousAgents(ctx) + if err != nil { + a.log.Error("can't delete previous agents", sl.Err(err)) + + return err + } + + go func() { + for msgFromAgentAgregator := range a.Consumer.GetMessages() { + go a.AgentApp.ConsumeMessageFromOrchestrator(ctx, msgFromAgentAgregator) + } + }() + + ticker := time.NewTicker(time.Duration(a.AgentApp.InactiveTime) * time.Second) + defer ticker.Stop() + + for { + select { + case result := <-a.AgentApp.SimpleComputers: + go a.AgentApp.ConsumeMessageFromComputers(ctx, result, a.Producer) + case <-ctx.Done(): + a.AgentApp.Terminate() + return fmt.Errorf("agent terminated") + case <-ticker.C: + a.AgentApp.Ping(a.Producer) + } + } +} diff --git a/backend/internal/app/orchestrator/app.go b/backend/internal/app/orchestrator/app.go new file mode 100644 index 0000000..d57c0b7 --- /dev/null +++ b/backend/internal/app/orchestrator/app.go @@ -0,0 +1,107 @@ +package orchestratorapp + +import ( + "context" + "log/slog" + + "github.com/Prrromanssss/DAEE-fullstack/internal/config" + "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" + "github.com/Prrromanssss/DAEE-fullstack/internal/storage" + + "github.com/Prrromanssss/DAEE-fullstack/internal/orchestrator" + "github.com/Prrromanssss/DAEE-fullstack/internal/rabbitmq" +) + +type App struct { + log *slog.Logger + OrchestratorApp *orchestrator.Orchestrator + amqpConfig rabbitmq.AMQPConfig + Producer brokers.Producer + Consumer brokers.Consumer +} + +// MustRun runs Orchestrator and panics if any error occurs. +func (a *App) MustRun(ctx context.Context) { + if err := a.Run(ctx); err != nil { + panic(err) + } +} + +// New creates new Orchestrator app. +func New( + log *slog.Logger, + cfg *config.Config, + dbCfg *storage.Storage, + cancel context.CancelFunc, +) (*App, error) { + amqpCfg, err := rabbitmq.NewAMQPConfig(log, cfg.RabbitMQURL) + if err != nil { + log.Error("can't create NewAMQPConfig", sl.Err(err)) + return nil, err + } + + producer, err := rabbitmq.NewAMQPProducer(log, amqpCfg, cfg.QueueForExpressionsToAgents) + if err != nil { + log.Error("can't create NewAMQPProducer", sl.Err(err)) + return nil, err + } + + consumer, err := rabbitmq.NewAMQPConsumer(log, amqpCfg, cfg.QueueForResultsFromAgents) + if err != nil { + log.Error("can't create NewAMQPConsumer", sl.Err(err)) + return nil, err + } + + orc, err := orchestrator.NewOrchestrator( + log, + dbCfg, + cancel, + ) + if err != nil { + log.Error("orchestrator error", sl.Err(err)) + return nil, err + } + + return &App{ + log: log, + OrchestratorApp: orc, + amqpConfig: *amqpCfg, + Producer: producer, + Consumer: consumer, + }, nil +} + +// RunOrchestrator agregates agents, +// consumes messages from client, manages their job. +func (a *App) Run(ctx context.Context) error { + defer func() { + a.amqpConfig.Close() + a.Producer.Close() + a.Consumer.Close() + }() + + const fn = "orchestratorapp.Run" + + log := a.log.With( + slog.String("fn", fn), + ) + + err := a.OrchestratorApp.ReloadComputingExpressions(ctx, a.Producer) + if err != nil { + log.Error("can't reload computing expressions", sl.Err(err)) + + return err + } + + for { + select { + case msgFromAgents := <-a.Consumer.GetMessages(): + go a.OrchestratorApp.HandleMessagesFromAgents(ctx, msgFromAgents, a.Producer) + case <-ctx.Done(): + log.Error("orchestrator stopped") + + return err + } + } +} diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 179f9be..3f3b9a6 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -10,11 +10,12 @@ import ( ) type Config struct { - Env string `yaml:"env" env:"ENV" env-default:"local"` - LogPath string `yaml:"log_path" env-required:"true"` - DatabaseInstance `yaml:"database_instance" env-required:"true"` - RabbitQueue `yaml:"rabbit_queue" env-required:"true"` - HTTPServer `yaml:"http_server" env-required:"true"` + Env string `yaml:"env" env:"ENV" env-default:"local"` + LogPathAgent string `yaml:"log_path_agent" env-required:"true"` + LogPathOrchestrator string `yaml:"log_path_orchestrator" env-required:"true"` + DatabaseInstance `yaml:"database_instance" env-required:"true"` + RabbitQueue `yaml:"rabbit_queue" env-required:"true"` + HTTPServer `yaml:"http_server" env-required:"true"` } type HTTPServer struct { diff --git a/backend/internal/domain/messages/orchestrator_message.go b/backend/internal/domain/messages/orchestrator_message.go deleted file mode 100644 index f413913..0000000 --- a/backend/internal/domain/messages/orchestrator_message.go +++ /dev/null @@ -1,6 +0,0 @@ -package messages - -type MessageFromOrchestrator struct { - ExpressionID int32 `json:"expression_id"` - Expression string `json:"expression"` -} diff --git a/backend/internal/http-server/handlers/expression.go b/backend/internal/http-server/handlers/expression.go index a90a6c0..3c1d083 100644 --- a/backend/internal/http-server/handlers/expression.go +++ b/backend/internal/http-server/handlers/expression.go @@ -7,8 +7,9 @@ import ( "net/http" "time" - agentagregator "github.com/Prrromanssss/DAEE-fullstack/internal/agent_agregator" + "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" "github.com/Prrromanssss/DAEE-fullstack/internal/domain/messages" + "github.com/Prrromanssss/DAEE-fullstack/internal/orchestrator" "github.com/Prrromanssss/DAEE-fullstack/internal/orchestrator/parser" "github.com/Prrromanssss/DAEE-fullstack/internal/storage" @@ -19,7 +20,8 @@ import ( func HandlerCreateExpression( log *slog.Logger, dbCfg *storage.Storage, - agentAgr *agentagregator.AgentAgregator, + orc *orchestrator.Orchestrator, + producer brokers.Producer, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { const fn = "handlers.HandlerCreateExpression" @@ -59,12 +61,12 @@ func HandlerCreateExpression( return } - msgToQueue := messages.MessageFromOrchestrator{ + msgToQueue := messages.ExpressionMessage{ ExpressionID: expression.ExpressionID, Expression: parseData, } - agentAgr.AddTask(msgToQueue) + orc.AddTask(msgToQueue, producer) log.Info("send message from orchestrator to agent agregator") diff --git a/backend/internal/lib/logger/setup/logger.go b/backend/internal/lib/logger/setup/logger.go new file mode 100644 index 0000000..69f630c --- /dev/null +++ b/backend/internal/lib/logger/setup/logger.go @@ -0,0 +1,40 @@ +package setup + +import ( + "log/slog" + "os" +) + +const ( + envLocal = "local" + envDev = "dev" + envProd = "prod" +) + +func SetupLogger(env, logPath string) *slog.Logger { + var log *slog.Logger + + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + panic("failed to open log file: " + err.Error()) + } + defer logFile.Close() + + switch env { + case envLocal: + log = SetupPrettySlog(logFile) + case envDev: + log = slog.New( + slog.NewJSONHandler(logFile, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }), + ) + case envProd: + log = slog.New( + slog.NewJSONHandler(logFile, &slog.HandlerOptions{ + Level: slog.LevelInfo, + }), + ) + } + return log +} diff --git a/backend/internal/lib/logger/setup/pretty_logger.go b/backend/internal/lib/logger/setup/pretty_logger.go new file mode 100644 index 0000000..7252cd0 --- /dev/null +++ b/backend/internal/lib/logger/setup/pretty_logger.go @@ -0,0 +1,20 @@ +package setup + +import ( + "log/slog" + "os" + + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/handlers/slogpretty" +) + +func SetupPrettySlog(logFile *os.File) *slog.Logger { + opts := slogpretty.PrettyHandlerOptions{ + SlogOpts: &slog.HandlerOptions{ + Level: slog.LevelDebug, + }, + } + + handler := opts.NewPrettyHandler(logFile) + + return slog.New(handler) +} diff --git a/backend/internal/orchestrator/orchestrator.go b/backend/internal/orchestrator/orchestrator.go index a8b18ad..3064e3a 100644 --- a/backend/internal/orchestrator/orchestrator.go +++ b/backend/internal/orchestrator/orchestrator.go @@ -1 +1,255 @@ package orchestrator + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "sync" + "time" + + "github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers" + "github.com/Prrromanssss/DAEE-fullstack/internal/domain/messages" + "github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl" + "github.com/Prrromanssss/DAEE-fullstack/internal/orchestrator/parser" + "github.com/Prrromanssss/DAEE-fullstack/internal/storage" + "github.com/Prrromanssss/DAEE-fullstack/internal/storage/postgres" + + "github.com/streadway/amqp" +) + +type Orchestrator struct { + log *slog.Logger + dbConfig *storage.Storage + mu *sync.Mutex + kill context.CancelFunc +} + +// NewOrchestrator creates new Orchestrator. +func NewOrchestrator( + log *slog.Logger, + dbCfg *storage.Storage, + kill context.CancelFunc, +) (*Orchestrator, error) { + + return &Orchestrator{ + log: log, + dbConfig: dbCfg, + mu: &sync.Mutex{}, + kill: kill, + }, nil +} + +// AddTask publish message to agents. +func (o *Orchestrator) AddTask( + expressionMessage messages.ExpressionMessage, + producer brokers.Producer, +) { + const fn = "orchestrator.AddTask" + + o.log.Info("orchestrator ready to publish message to queue") + + tokens := parser.GetTokens(expressionMessage.Expression) + for _, token := range tokens { + err := producer.PublishExpressionMessage(&messages.ExpressionMessage{ + ExpressionID: expressionMessage.ExpressionID, + Token: token, + Expression: expressionMessage.Expression, + }) + if err != nil { + o.log.Error("can't publish token to queue", sl.Err(err), slog.String("fn", fn)) + // TODO: think about it. Should I kill orchestrator? + o.kill() + } + } +} + +// ReloadComputingExpressions add not completed expressions again to queue. +func (o *Orchestrator) ReloadComputingExpressions( + ctx context.Context, + producer brokers.Producer, +) error { + const fn = "orchestrator.ReloadComputingExpressions" + + expressions, err := o.dbConfig.DB.GetComputingExpressions(ctx) + if err != nil { + return fmt.Errorf("orhestrator Error: %v, fn: %s", err, fn) + } + + for _, expr := range expressions { + msgToQueue := messages.ExpressionMessage{ + ExpressionID: expr.ExpressionID, + Expression: expr.ParseData, + } + o.AddTask(msgToQueue, producer) + } + + return nil +} + +// HandlePing accepts ping from agent. +func (o *Orchestrator) HandlePing(ctx context.Context, agentID int32) error { + const fn = "orchestrator.HandlePing" + + err := o.dbConfig.DB.UpdateAgentLastPing( + ctx, + postgres.UpdateAgentLastPingParams{ + AgentID: agentID, + LastPing: time.Now().UTC(), + }) + if err != nil { + return fmt.Errorf("can't update last ping: %v, fn: %s", err, fn) + } + + return nil +} + +// HandleExpressionFromAgents makes expressions ready or publishes it again to queue. +func (o *Orchestrator) HandleExpression( + ctx context.Context, + exprMsg messages.ExpressionMessage, + producer brokers.Producer, +) error { + const fn = "orchestrator.HandleExpressionFromAgents" + + newResultAndToken, err := o.UpdateExpressionFromAgents(ctx, exprMsg) + if err != nil { + return fmt.Errorf("orchestrator error: %v, fn: %s", err, fn) + } + + result, err := strconv.Atoi(newResultAndToken.Result) + + if err == nil && + parser.IsNumber(newResultAndToken.Result) || + (newResultAndToken.Result[0] == '-' && parser.IsNumber(newResultAndToken.Result[1:])) { + err := o.UpdateExpressionToReady(ctx, result, exprMsg.ExpressionID) + if err != nil { + return fmt.Errorf("orchestrator error: %v, fn: %s", err, fn) + } + + return nil + } + if newResultAndToken.Token != "" { + err := producer.PublishExpressionMessage(&messages.ExpressionMessage{ + ExpressionID: exprMsg.ExpressionID, + Token: newResultAndToken.Token, + Expression: newResultAndToken.Result, + }) + if err != nil { + return fmt.Errorf("orchestrator error: %v, fn: %s", err, fn) + } + } + + return nil +} + +// UpdateExpressionFromAgents parses expression with new token and updates it in the database. +func (o *Orchestrator) UpdateExpressionFromAgents( + ctx context.Context, + exprMsg messages.ExpressionMessage, +) (messages.ResultAndTokenMessage, error) { + const fn = "orchestrator.UpdateExpressionFromAgents" + + expression, err := o.dbConfig.DB.GetExpressionByID( + ctx, + exprMsg.ExpressionID, + ) + if err != nil { + return messages.ResultAndTokenMessage{}, + fmt.Errorf("can't get expression by id: %v, fn: %s", err, fn) + } + + resAndTokenMsg, err := parser.InsertResultToToken( + expression.ParseData, + exprMsg.Token, + exprMsg.Result, + ) + if err != nil { + return messages.ResultAndTokenMessage{}, + fmt.Errorf("can't insert tokens to expression: %v, fn: %s", err, fn) + } + + err = o.dbConfig.DB.UpdateExpressionParseData( + ctx, + postgres.UpdateExpressionParseDataParams{ + ExpressionID: exprMsg.ExpressionID, + ParseData: resAndTokenMsg.Result, + }) + if err != nil { + return messages.ResultAndTokenMessage{}, + fmt.Errorf("can't update expression data: %v, fn: %s", err, fn) + } + + return resAndTokenMsg, nil +} + +// UpdateExpressionToReady updates expression to ready. +func (o *Orchestrator) UpdateExpressionToReady( + ctx context.Context, + result int, + exprID int32, +) error { + const fn = "orchestrator.UpdateExpressionToReady" + + err := o.dbConfig.DB.MakeExpressionReady( + ctx, + postgres.MakeExpressionReadyParams{ + ParseData: "", + Result: int32(result), + UpdatedAt: time.Now().UTC(), + ExpressionID: exprID, + }) + if err != nil { + return fmt.Errorf("can't make expression ready: %v, fn: %s", err, fn) + } + + return nil +} + +// HandleMessagesFromAgents consumes message from agents. +// If it is ping handle it with HandlePing method. +// If it is expression handle it with HandleExpression method. +func (o *Orchestrator) HandleMessagesFromAgents( + ctx context.Context, + msgFromAgents amqp.Delivery, + producer brokers.Producer, +) { + const fn = "orchestrator.ConsumeMessagesFromAgents" + + log := o.log.With( + slog.String("fn", fn), + ) + + log.Info("orchestrator consumes message from agent", slog.String("msg", string(msgFromAgents.Body))) + + err := msgFromAgents.Ack(false) + if err != nil { + log.Error("error acknowledging message", sl.Err(err)) + // TODO: think about it. Should I kill orchestrator? + o.kill() + } + + var exprMsg messages.ExpressionMessage + if err := json.Unmarshal(msgFromAgents.Body, &exprMsg); err != nil { + log.Error("failed to parse JSON", sl.Err(err)) + // TODO: think about it. Should I kill orchestrator? + o.kill() + } + + if exprMsg.IsPing { + err := o.HandlePing(ctx, exprMsg.AgentID) + if err != nil { + log.Error("orchestrator error", sl.Err(err)) + // TODO: think about it. Should I kill orchestrator? + o.kill() + } + } else { + err := o.HandleExpression(ctx, exprMsg, producer) + if err != nil { + log.Error("", sl.Err(err)) + // TODO: think about it. Should I kill orchestrator? + o.kill() + } + } +}