Skip to content

Commit

Permalink
Merge pull request #1 from Prrromanssss/agentagregator_change_to_orch…
Browse files Browse the repository at this point in the history
…estrator

reorganize structure
  • Loading branch information
Prrromanssss authored Apr 17, 2024
2 parents 55524c7 + d4f47e5 commit d928b5c
Show file tree
Hide file tree
Showing 14 changed files with 618 additions and 550 deletions.
106 changes: 24 additions & 82 deletions backend/cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,101 +1,43 @@
package agentservice
package main

import (
"context"
"log/slog"
"time"

ag "github.com/Prrromanssss/DAEE-fullstack/internal/agent"
agentapp "github.com/Prrromanssss/DAEE-fullstack/internal/app/agent"
"github.com/Prrromanssss/DAEE-fullstack/internal/config"
"github.com/Prrromanssss/DAEE-fullstack/internal/domain/brokers"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl"
"github.com/Prrromanssss/DAEE-fullstack/internal/rabbitmq"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/logcleaner"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/setup"
"github.com/Prrromanssss/DAEE-fullstack/internal/storage"
"github.com/Prrromanssss/DAEE-fullstack/internal/storage/postgres"
)

// RunAgent makes all requirements needed to run agent and call AgentService to run it.
func RunAgent(log *slog.Logger, cfg *config.Config, dbCfg *storage.Storage) {
const fn = "agentservice.RunAgent"

log = log.With(
slog.String("fn", fn),
)

func main() {
ctxWithCancel, cancel := context.WithCancel(context.Background())
defer cancel()

amqpCfg, err := rabbitmq.NewAMQPConfig(log, cfg.RabbitMQURL)
if err != nil {
cancel()
log.Error("can't create NewAMQPConfig", sl.Err(err))
return
}

producer, err := rabbitmq.NewAMQPProducer(log, amqpCfg, cfg.QueueForResultsFromAgents)
if err != nil {
cancel()
log.Error("can't create NewAMQPProducer", sl.Err(err))
return
}

consumer, err := rabbitmq.NewAMQPConsumer(log, amqpCfg, cfg.QueueForExpressionsToAgents)
if err != nil {
cancel()
log.Error("can't create NewAMQPConsumer", sl.Err(err))
return
}
// Load Config
cfg := config.MustLoad()

agent, err := ag.NewAgent(
log,
dbCfg,
postgres.Agent{},
200,
cancel,
// Configuration Logger
log := setup.SetupLogger(cfg.Env, cfg.LogPathAgent)
log.Info(
"start agent",
slog.String("env", cfg.Env),
slog.String("version", "2"),
)
if err != nil {
cancel()
log.Error("can't create agent", sl.Err(err))
return
}
log.Debug("debug messages are enabled")

go AgentService(ctxWithCancel, log, amqpCfg, producer, consumer, agent)
}
go logcleaner.CleanLog(10*time.Minute, cfg.LogPathAgent, 100)

// AgentService gets messages from SimpleComputers, handle these messages,
// sends pings to Agent Agregator.
func AgentService(
ctx context.Context,
log *slog.Logger,
amqpCfg *rabbitmq.AMQPConfig,
producer brokers.Producer,
consumer brokers.Consumer,
agent *ag.Agent,
) {
defer func() {
amqpCfg.Close()
producer.Close()
consumer.Close()
agent.MakeExpressionsTerminated(ctx)
}()
// Configuration Storage
dbCfg := storage.NewStorage(cfg.StorageURL)

go func() {
for msgFromAgentAgregator := range consumer.GetMessages() {
go agent.ConsumeMessageFromAgentAgregator(ctx, msgFromAgentAgregator)
}
}()

ticker := time.NewTicker(time.Duration(agent.InactiveTime) * time.Second)
defer ticker.Stop()

for {
select {
case result := <-agent.SimpleComputers:
go agent.ConsumeMessageFromComputers(ctx, result, producer)
case <-ctx.Done():
agent.Terminate()
return
case <-ticker.C:
agent.Ping(producer)
}
// Configuration Agent
application, err := agentapp.New(log, cfg, dbCfg, cancel)
if err != nil {
panic(err)
}

go application.MustRun(ctxWithCancel)
}
108 changes: 0 additions & 108 deletions backend/cmd/agent_agregator/main.go

This file was deleted.

103 changes: 23 additions & 80 deletions backend/cmd/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,51 @@ import (
"context"
"log/slog"
"net/http"
"os"
"time"

agentservice "github.com/Prrromanssss/DAEE-fullstack/cmd/agent"
agentagregatorservice "github.com/Prrromanssss/DAEE-fullstack/cmd/agent_agregator"
orchestratorapp "github.com/Prrromanssss/DAEE-fullstack/internal/app/orchestrator"

"github.com/Prrromanssss/DAEE-fullstack/internal/config"
"github.com/Prrromanssss/DAEE-fullstack/internal/http-server/handlers"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/handlers/slogpretty"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/logcleaner"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/setup"
"github.com/Prrromanssss/DAEE-fullstack/internal/lib/logger/sl"
"github.com/Prrromanssss/DAEE-fullstack/internal/storage"

"github.com/go-chi/chi"
"github.com/go-chi/cors"
)

const (
envLocal = "local"
envDev = "dev"
envProd = "prod"
)

func main() {
// Load config
ctxWithCancel, cancel := context.WithCancel(context.Background())
defer cancel()

// Load Config
cfg := config.MustLoad()

// Configuration logger
log := setupLogger(cfg.Env, cfg.LogPath)
// Configuration Logger
log := setup.SetupLogger(cfg.Env, cfg.LogPathOrchestrator)
log.Info(
"start daee",
"start orchestrator",
slog.String("env", cfg.Env),
slog.String("version", "2"),
)
log.Debug("debug messages are enabled")

go logcleaner.CleanLog(10*time.Minute, cfg.LogPath, 100)

// Configuration storage
go logcleaner.CleanLog(10*time.Minute, cfg.LogPathOrchestrator, 100)

// Configuration Storage
dbCfg := storage.NewStorage(cfg.StorageURL)

// Configuration AgentAgregator
agentAgr, err := agentagregatorservice.RunAgentAgregator(log, cfg, dbCfg)
// Configuration Orchestrator
application, err := orchestratorapp.New(log, cfg, dbCfg, cancel)
if err != nil {
log.Error("can't make agent agregator", sl.Err(err))
return
panic(err)
}

// Delete previous agents
err = dbCfg.DB.DeleteAgents(context.Background())
if err != nil {
log.Error("can't delete previous agents", sl.Err(err))
}
go application.MustRun(ctxWithCancel)

// Create Agent1
agentservice.RunAgent(log, cfg, dbCfg)

// Create Agent2
agentservice.RunAgent(log, cfg, dbCfg)

// Create Agent3
agentservice.RunAgent(log, cfg, dbCfg)

// Configuration http-server
// Configuration HTTP-Server
router := chi.NewRouter()

router.Use(cors.Handler(cors.Options{
Expand All @@ -82,13 +62,16 @@ func main() {

v1Router := chi.NewRouter()

// TODO: Get rid of agentAgr in endpoint!!!!!!!!

// Expression endpoints
v1Router.Post("/expressions", handlers.HandlerCreateExpression(log, dbCfg, agentAgr)) // <<<<-----
v1Router.Post("/expressions", handlers.HandlerCreateExpression(
log,
dbCfg,
application.OrchestratorApp,
application.Producer,
))
v1Router.Get("/expressions", handlers.HandlerGetExpressions(log, dbCfg))

// Opeartsion endpoints
// Operation endpoints
v1Router.Get("/operations", handlers.HandlerGetOperations(log, dbCfg))
v1Router.Patch("/operations", handlers.HandlerUpdateOperation(log, dbCfg))

Expand All @@ -112,43 +95,3 @@ func main() {

log.Info("server stopped")
}

func setupLogger(env, logPath string) *slog.Logger {
var log *slog.Logger

logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
panic("failed to open log file: " + err.Error())
}
defer logFile.Close()

switch env {
case envLocal:
log = setupPrettySlog(logFile)
case envDev:
log = slog.New(
slog.NewJSONHandler(logFile, &slog.HandlerOptions{
Level: slog.LevelDebug,
}),
)
case envProd:
log = slog.New(
slog.NewJSONHandler(logFile, &slog.HandlerOptions{
Level: slog.LevelInfo,
}),
)
}
return log
}

func setupPrettySlog(logFile *os.File) *slog.Logger {
opts := slogpretty.PrettyHandlerOptions{
SlogOpts: &slog.HandlerOptions{
Level: slog.LevelDebug,
},
}

handler := opts.NewPrettyHandler(logFile)

return slog.New(handler)
}
3 changes: 2 additions & 1 deletion backend/config/local.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
env: "local"
log_path: "./backend/daee.log"
log_path_agent: "./backend/agent.log"
log_path_orchestrator: "./backend/orchestrator.log"
rabbit_queue:
rabbitmq_url: "amqp://guest:guest@localhost:5672/"
queue_for_expressions_to_agents: "Expressions to agents"
Expand Down
Loading

0 comments on commit d928b5c

Please sign in to comment.