Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new feature] promtail: Add config reload endoint / signal to promtail #7247

Merged
merged 30 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0c7e77c
[new feature] promtail: support /reload config
liguozhong Sep 26, 2022
f7d7244
fix lint
liguozhong Sep 26, 2022
b9c89d5
lint
liguozhong Sep 26, 2022
0b7e536
lint
liguozhong Sep 26, 2022
07a00bd
lint
liguozhong Sep 26, 2022
9e82c9f
lint
liguozhong Sep 26, 2022
4bdf6cd
lint
liguozhong Sep 26, 2022
ce7ddb0
lint
liguozhong Sep 26, 2022
e01b8bd
lint,and *pointer
liguozhong Sep 27, 2022
4d6ee77
add promtail reload test
liguozhong Sep 27, 2022
69489b9
lint
liguozhong Sep 27, 2022
646b120
Update clients/pkg/promtail/promtail.go
liguozhong Sep 27, 2022
ebc0e35
fix review tip
liguozhong Sep 27, 2022
df6f15d
wrap server error
liguozhong Sep 27, 2022
f838787
do not panic
liguozhong Sep 27, 2022
3074a1e
do not panic when reload config fail.
liguozhong Sep 27, 2022
9bd469f
add reload metrics and test
liguozhong Sep 27, 2022
ab2dd37
delete cnt int
liguozhong Sep 27, 2022
6bc47e3
do not panic,add more detail log for watchConfig
liguozhong Sep 27, 2022
6651e62
do not reload when config not changed
liguozhong Sep 27, 2022
2817f60
lint
liguozhong Sep 27, 2022
a2809cd
lint
liguozhong Sep 27, 2022
b980753
lint
liguozhong Sep 27, 2022
41b584f
add test for promtailServer.configLoaded field
liguozhong Sep 28, 2022
5e2dd98
Update clients/cmd/promtail/main.go
liguozhong Oct 10, 2022
489f9cf
fix review tips
liguozhong Oct 10, 2022
5582e95
Merge branch 'main' into promtail_reload
liguozhong Oct 10, 2022
dba1290
trigger ci agent
liguozhong Oct 10, 2022
3928fff
replace errors.Wrap with std fmt.Errorf
liguozhong Oct 11, 2022
3396a6f
Merge branch 'main' into promtail_reload
liguozhong Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"sync"

// embed time zone data
_ "time/tzdata"
Expand All @@ -20,28 +21,31 @@ import (
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/config"
promtail_config "github.com/grafana/loki/clients/pkg/promtail/config"

"github.com/grafana/loki/pkg/util"
_ "github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/cfg"

_ "github.com/grafana/loki/pkg/util/build"
util_log "github.com/grafana/loki/pkg/util/log"
)

func init() {
prometheus.MustRegister(version.NewCollector("promtail"))
}

var mtx sync.Mutex

type Config struct {
config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
checkSyntax bool
configFile string
configExpandEnv bool
inspect bool
promtail_config.Config `yaml:",inline"`
printVersion bool
printConfig bool
logConfig bool
dryRun bool
checkSyntax bool
configFile string
configExpandEnv bool
inspect bool
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -68,11 +72,11 @@ func (c *Config) Clone() flagext.Registerer {
func main() {
// Load config, merging config file and CLI flags
var config Config
if err := cfg.DefaultUnmarshal(&config, os.Args[1:], flag.CommandLine); err != nil {
args := os.Args[1:]
if err := cfg.DefaultUnmarshal(&config, args, flag.CommandLine); err != nil {
fmt.Println("Unable to parse config:", err)
os.Exit(1)
}

if config.checkSyntax {
if config.configFile == "" {
fmt.Println("Invalid config file")
Expand Down Expand Up @@ -123,7 +127,17 @@ func main() {
}

clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels)
p, err := promtail.New(config.Config, clientMetrics, config.dryRun)
newConfigFunc := func() (*promtail_config.Config, error) {
mtx.Lock()
defer mtx.Unlock()
var config Config
if err := cfg.DefaultUnmarshal(&config, args, flag.NewFlagSet(os.Args[0], flag.ExitOnError)); err != nil {
fmt.Println("Unable to parse config:", err)
return nil, fmt.Errorf("unable to parse config: %w", err)
}
return &config.Config, nil
}
p, err := promtail.New(config.Config, newConfigFunc, clientMetrics, config.dryRun)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1)
Expand Down
153 changes: 134 additions & 19 deletions clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package promtail

import (
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/logentry/stages"
Expand All @@ -16,6 +22,20 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

var reloadSuccessTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_success_total",
Help: "Number of reload success times.",
})

var reloadFailTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_fail_total",
Help: "Number of reload fail times.",
})

var errConfigNotChange = errors.New("config has not changed")

// Option is a function that can be passed to the New method of Promtail and
// customize the Promtail that is created.
type Option func(p *Promtail)
Expand All @@ -42,17 +62,32 @@ type Promtail struct {
logger log.Logger
reg prometheus.Registerer

stopped bool
mtx sync.Mutex
stopped bool
mtx sync.Mutex
configLoaded string
newConfig func() (*config.Config, error)
metrics *client.Metrics
dryRun bool
}

// New makes a new Promtail.
func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
func New(cfg config.Config, newConfig func() (*config.Config, error), metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
// Initialize promtail with some defaults and allow the options to override
// them.

promtail := &Promtail{
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
logger: util_log.Logger,
reg: prometheus.DefaultRegisterer,
metrics: metrics,
dryRun: dryRun,
}
err := promtail.reg.Register(reloadSuccessTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadSuccessTotal :%w", err)
}
err = promtail.reg.Register(reloadFailTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadFailTotal :%w", err)
}
for _, o := range opts {
// todo (callum) I don't understand why I needed to add this check
Expand All @@ -61,37 +96,71 @@ func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option
}
o(promtail)
}
err = promtail.reloadConfig(&cfg)
if err != nil {
return nil, err
}
server, err := server.New(cfg.ServerConfig, promtail.logger, promtail.targetManagers, cfg.String())
if err != nil {
return nil, fmt.Errorf("error creating loki server: %w", err)
}
promtail.server = server
promtail.newConfig = newConfig

cfg.Setup(promtail.logger)
return promtail, nil
}

func (p *Promtail) reloadConfig(cfg *config.Config) error {
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
level.Debug(p.logger).Log("msg", "Reloading configuration file")
p.mtx.Lock()
defer p.mtx.Unlock()
newConfigFile := cfg.String()
if newConfigFile == p.configLoaded {
return errConfigNotChange
}
newConf := cfg.String()
level.Info(p.logger).Log("msg", "Reloading configuration file", "newConf", newConf)
if p.targetManagers != nil {
p.targetManagers.Stop()
}
if p.client != nil {
p.client.Stop()
}

cfg.Setup(p.logger)
if cfg.LimitsConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop)
}
var err error
if dryRun {
promtail.client, err = client.NewLogger(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.ClientConfigs...)
if p.dryRun {
p.client, err = client.NewLogger(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
cfg.PositionsConfig.ReadOnly = true
} else {
promtail.client, err = client.NewMulti(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
p.client, err = client.NewMulti(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
if err != nil {
return nil, err
return err
}
}

tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig)
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
return err
}
promtail.targetManagers = tms
server, err := server.New(cfg.ServerConfig, promtail.logger, tms, cfg.String())
if err != nil {
return nil, err
p.targetManagers = tms

promServer := p.server
if promServer != nil {
promtailServer, ok := promServer.(*server.PromtailServer)
if !ok {
return errors.New("promtailServer cast fail")
}
promtailServer.ReloadServer(p.targetManagers, cfg.String())
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
}
promtail.server = server
return promtail, nil
p.configLoaded = newConf
return nil
}

// Run the promtail; will block until a signal is received.
Expand All @@ -103,6 +172,7 @@ func (p *Promtail) Run() error {
return nil
}
p.mtx.Unlock() // unlock before blocking
go p.watchConfig()
return p.server.Run()
}

Expand Down Expand Up @@ -133,3 +203,48 @@ func (p *Promtail) Shutdown() {
func (p *Promtail) ActiveTargets() map[string][]target.Target {
return p.targetManagers.ActiveTargets()
}

func (p *Promtail) watchConfig() {
// Reload handler.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
if p.newConfig == nil {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "Promtail newConfig func is Empty")
return
}
promtailServer, ok := p.server.(*server.PromtailServer)
if !ok {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "promtailServer cast fail")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message won't mean much to the user. Can we return the actual parse error here?

return
}
level.Warn(p.logger).Log("msg", "enable watchConfig")
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
for {
select {
case <-hup:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, not super familiar with these live reload patterns, can you explain the SIGHUP flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refer to the reload documentation of prometheus, we should also provide this reload way.

https://prometheus.io/docs/prometheus/latest/configuration/configuration/
A configuration reload is triggered by sending a SIGHUP.
image

_ = p.reload()
case rc := <-promtailServer.Reload():
if err := p.reload(); err != nil {
rc <- err
} else {
rc <- nil
}
}
}
}

func (p *Promtail) reload() error {
cfg, err := p.newConfig()
if err != nil {
reloadFailTotal.Inc()
return fmt.Errorf("Error new Config: %w", err)
}
err = p.reloadConfig(cfg)
if err != nil {
reloadFailTotal.Inc()
level.Error(p.logger).Log("msg", "Error reloading config", "err", err)
return err
}
reloadSuccessTotal.Inc()
return nil
}
Loading