-
Notifications
You must be signed in to change notification settings - Fork 369
/
Copy pathrunner.go
94 lines (82 loc) · 2.48 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.
package runner
import (
"context"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/infrastructure"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
)
type Config struct {
config.Server
InfraIR *message.InfraIR
RateLimitInfraIR *message.RateLimitInfraIR
}
type Runner struct {
Config
mgr infrastructure.Manager
}
func (r *Runner) Name() string {
return "infrastructure"
}
func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}
// Start starts the infrastructure runner
func (r *Runner) Start(ctx context.Context) error {
var err error
r.Logger = r.Logger.WithValues("runner", r.Name())
r.mgr, err = infrastructure.NewManager(&r.Config.Server)
if err != nil {
r.Logger.Error(err, "failed to create new manager")
return err
}
go r.subscribeToProxyInfraIR(ctx)
// subscribe to rate limit infra IR if ratelimit has been enabled in the config.
if r.EnvoyGateway.RateLimit != nil {
go r.subscribeToRateLimitInfraIR(ctx)
}
r.Logger.Info("started")
return nil
}
func (r *Runner) subscribeToProxyInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
val := update.Value
if update.Delete {
if err := r.mgr.DeleteProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete infra")
}
} else {
// Manage the proxy infra.
if err := r.mgr.CreateOrUpdateProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new infra")
}
}
},
)
r.Logger.Info("infra subscriber shutting down")
}
func (r *Runner) subscribeToRateLimitInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.RateLimitInfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.RateLimitInfra]) {
val := update.Value
if update.Delete {
if err := r.mgr.DeleteRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete rate limit infra")
}
} else {
// Manage the rate limit infra.
if err := r.mgr.CreateOrUpdateRateLimitInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new rate limit infra")
}
}
},
)
r.Logger.Info("ratelimit infra subscriber shutting down")
}