diff --git a/cmd/promtail/main.go b/cmd/promtail/main.go index 36264575f2e15..263d84f3e7da7 100644 --- a/cmd/promtail/main.go +++ b/cmd/promtail/main.go @@ -64,11 +64,10 @@ func main() { } level.Info(util.Logger).Log("msg", "Starting Promtail", "version", version.Info()) + defer p.Shutdown() if err := p.Run(); err != nil { level.Error(util.Logger).Log("msg", "error starting promtail", "error", err) os.Exit(1) } - - p.Shutdown() } diff --git a/docs/clients/promtail/troubleshooting.md b/docs/clients/promtail/troubleshooting.md index ab590871b3143..58c98e2272698 100644 --- a/docs/clients/promtail/troubleshooting.md +++ b/docs/clients/promtail/troubleshooting.md @@ -3,6 +3,53 @@ This document describes known failure modes of `promtail` on edge cases and the adopted trade-offs. +## Pipe data to Promtail + +Promtail supports piping data for sending logs to Loki. This is a very useful way to troubleshooting your configuration. +Once you have promtail installed you can for instance use the following command to send logs to a local Loki instance: + +```bash +cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push +``` + +You can also add additional labels from command line using: + +```bash +cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push --client.external-labels=k1=v1,k2=v2 +``` + +This will add labels `k1` and `k2` with respective values `v1` and `v2`. + +In pipe mode Promtail also support file configuration using `--config.file`, however do note that positions config is not used and +only **the first scrape config is used**. + +[`static_configs:`](./configuration) can be used to provide static labels, although the targets property is ignored. + +If you don't provide any [`scrape_config:`](./configuration#scrape_config) a default one is used which will automatically adds the following default labels: `{job="stdin",hostname=""}`. + +For example you could use this config below to parse and add the label `level` on all your piped logs: + +```yaml +clients: + - url: http://localhost:3100/loki/api/v1/push + +scrape_configs: +- job_name: system + pipeline_stages: + - regex: + expression: '(level|lvl|severity)=(?P\\w+)' + - labels: + level: + static_configs: + - labels: + job: my-stdin-logs +``` + +``` +cat my.log | promtail --config.file promtail.yaml +``` + + ## A tailed file is truncated while `promtail` is not running Given the following order of events: diff --git a/pkg/promtail/promtail.go b/pkg/promtail/promtail.go index 3fae7132c7083..a3c53c9157a33 100644 --- a/pkg/promtail/promtail.go +++ b/pkg/promtail/promtail.go @@ -1,11 +1,12 @@ package promtail import ( + "sync" + "github.com/cortexproject/cortex/pkg/util" "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/config" - "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/server" "github.com/grafana/loki/pkg/promtail/targets" ) @@ -13,17 +14,15 @@ import ( // Promtail is the root struct for Promtail... type Promtail struct { client client.Client - positions *positions.Positions targetManagers *targets.TargetManagers server *server.Server + + stopped bool + mtx sync.Mutex } // New makes a new Promtail. func New(cfg config.Config) (*Promtail, error) { - positions, err := positions.New(util.Logger, cfg.PositionsConfig) - if err != nil { - return nil, err - } if cfg.ClientConfig.URL.URL != nil { // if a single client config is used we add it to the multiple client config for backward compatibility @@ -35,33 +34,45 @@ func New(cfg config.Config) (*Promtail, error) { return nil, err } - tms, err := targets.NewTargetManagers(util.Logger, positions, client, cfg.ScrapeConfig, &cfg.TargetConfig) + promtail := &Promtail{ + client: client, + } + + tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, client, cfg.ScrapeConfig, &cfg.TargetConfig) if err != nil { return nil, err } - + promtail.targetManagers = tms server, err := server.New(cfg.ServerConfig, tms) if err != nil { return nil, err } - - return &Promtail{ - client: client, - positions: positions, - targetManagers: tms, - server: server, - }, nil + promtail.server = server + return promtail, nil } // Run the promtail; will block until a signal is received. func (p *Promtail) Run() error { + p.mtx.Lock() + // if we stopped promtail before the server even started we can return without starting. + if p.stopped { + p.mtx.Unlock() + return nil + } + p.mtx.Unlock() // unlock before blocking return p.server.Run() } // Shutdown the promtail. func (p *Promtail) Shutdown() { - p.server.Shutdown() - p.targetManagers.Stop() - p.positions.Stop() + p.mtx.Lock() + defer p.mtx.Unlock() + p.stopped = true + if p.server != nil { + p.server.Shutdown() + } + if p.targetManagers != nil { + p.targetManagers.Stop() + } p.client.Stop() } diff --git a/pkg/promtail/targets/manager.go b/pkg/promtail/targets/manager.go index 118de6ea6aab5..87020a2f70908 100644 --- a/pkg/promtail/targets/manager.go +++ b/pkg/promtail/targets/manager.go @@ -1,6 +1,7 @@ package targets import ( + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/pkg/errors" @@ -19,12 +20,14 @@ type targetManager interface { // TargetManagers manages a list of target managers. type TargetManagers struct { targetManagers []targetManager + positions *positions.Positions } // NewTargetManagers makes a new TargetManagers func NewTargetManagers( + app Shutdownable, logger log.Logger, - positions *positions.Positions, + positionsConfig positions.Config, client api.EntryHandler, scrapeConfigs []scrape.Config, targetConfig *Config, @@ -34,6 +37,20 @@ func NewTargetManagers( var journalScrapeConfigs []scrape.Config var syslogScrapeConfigs []scrape.Config + if isStdinPipe() { + stdin, err := newStdinTargetManager(app, client, scrapeConfigs) + if err != nil { + return nil, err + } + targetManagers = append(targetManagers, stdin) + return &TargetManagers{targetManagers: targetManagers}, nil + } + + positions, err := positions.New(util.Logger, positionsConfig) + if err != nil { + return nil, err + } + for _, cfg := range scrapeConfigs { if cfg.HasServiceDiscoveryConfig() { fileScrapeConfigs = append(fileScrapeConfigs, cfg) @@ -84,7 +101,10 @@ func NewTargetManagers( targetManagers = append(targetManagers, syslogTargetManager) } - return &TargetManagers{targetManagers: targetManagers}, nil + return &TargetManagers{ + targetManagers: targetManagers, + positions: positions, + }, nil } @@ -125,4 +145,7 @@ func (tm *TargetManagers) Stop() { for _, t := range tm.targetManagers { t.Stop() } + if tm.positions != nil { + tm.positions.Stop() + } } diff --git a/pkg/promtail/targets/stdin_target_manager.go b/pkg/promtail/targets/stdin_target_manager.go new file mode 100644 index 0000000000000..d14a64b12b84b --- /dev/null +++ b/pkg/promtail/targets/stdin_target_manager.go @@ -0,0 +1,168 @@ +package targets + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/scrape" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/config" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +// bufferSize is the size of the buffered reader +const bufferSize = 8096 + +// file is an interface allowing us to abstract a file. +type file interface { + Stat() (os.FileInfo, error) + io.Reader +} + +var ( + // stdIn is os.Stdin but can be replaced for testing purpose. + stdIn file = os.Stdin + hostName, _ = os.Hostname() + // defaultStdInCfg is the default config for stdin target if none provided. + defaultStdInCfg = scrape.Config{ + JobName: "stdin", + ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{ + StaticConfigs: []*targetgroup.Group{ + {Labels: model.LabelSet{"job": "stdin"}}, + {Labels: model.LabelSet{"hostname": model.LabelValue(hostName)}}, + }, + }, + } +) + +func isStdinPipe() bool { + info, err := stdIn.Stat() + if err != nil { + level.Warn(util.Logger).Log("err", err) + return false + } + m := info.Mode() + if m&os.ModeCharDevice != 0 || info.Size() <= 0 { + return false + } + return true +} + +type Shutdownable interface { + Shutdown() +} + +type stdinTargetManager struct { + *readerTarget + app Shutdownable +} + +func newStdinTargetManager(app Shutdownable, client api.EntryHandler, configs []scrape.Config) (*stdinTargetManager, error) { + reader, err := newReaderTarget(stdIn, client, getStdinConfig(configs)) + if err != nil { + return nil, err + } + stdinManager := &stdinTargetManager{ + readerTarget: reader, + app: app, + } + // when we're done flushing our stdin we can shutdown the app. + go func() { + <-reader.ctx.Done() + app.Shutdown() + }() + return stdinManager, nil +} + +func getStdinConfig(configs []scrape.Config) scrape.Config { + cfg := defaultStdInCfg + // if we receive configs we use the first one. + if len(configs) > 0 { + if len(configs) > 1 { + level.Warn(util.Logger).Log("msg", fmt.Sprintf("too many scrape configs, skipping %d configs.", len(configs)-1)) + } + cfg = configs[0] + } + return cfg +} + +func (t *stdinTargetManager) Ready() bool { + return t.ctx.Err() == nil +} +func (t *stdinTargetManager) Stop() { t.cancel() } +func (t *stdinTargetManager) ActiveTargets() map[string][]Target { return nil } +func (t *stdinTargetManager) AllTargets() map[string][]Target { return nil } + +type readerTarget struct { + in *bufio.Reader + out api.EntryHandler + lbs model.LabelSet + logger log.Logger + + cancel context.CancelFunc + ctx context.Context +} + +func newReaderTarget(in io.Reader, client api.EntryHandler, cfg scrape.Config) (*readerTarget, error) { + pipeline, err := stages.NewPipeline(log.With(util.Logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + lbs := model.LabelSet{} + for _, static := range cfg.ServiceDiscoveryConfig.StaticConfigs { + if static != nil && static.Labels != nil { + lbs = lbs.Merge(static.Labels) + } + } + ctx, cancel := context.WithCancel(context.Background()) + t := &readerTarget{ + in: bufio.NewReaderSize(in, bufferSize), + out: pipeline.Wrap(client), + cancel: cancel, + ctx: ctx, + lbs: lbs, + logger: log.With(util.Logger, "component", "reader"), + } + go t.read() + + return t, nil +} + +func (t *readerTarget) read() { + defer t.cancel() + + for { + if t.ctx.Err() != nil { + return + } + line, err := t.in.ReadString('\n') + if err != nil && err != io.EOF { + level.Warn(t.logger).Log("msg", "error reading buffer", "err", err) + return + } + line = strings.TrimRight(line, "\r\n") + if line == "" { + if err == io.EOF { + return + } + continue + } + if err := t.out.Handle(t.lbs, time.Now(), line); err != nil { + level.Error(t.logger).Log("msg", "error sending line", "err", err) + } + if err == io.EOF { + return + } + } +} diff --git a/pkg/promtail/targets/stdin_target_manager_test.go b/pkg/promtail/targets/stdin_target_manager_test.go new file mode 100644 index 0000000000000..d4d63cdcdfa2f --- /dev/null +++ b/pkg/promtail/targets/stdin_target_manager_test.go @@ -0,0 +1,184 @@ +package targets + +import ( + "bytes" + "io" + "os" + "strings" + "testing" + "time" + + "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/promtail/scrape" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +type line struct { + labels model.LabelSet + entry string +} + +type clientRecorder struct { + recorded []line +} + +func (c *clientRecorder) Handle(labels model.LabelSet, time time.Time, entry string) error { + c.recorded = append(c.recorded, line{labels: labels, entry: entry}) + return nil +} + +func Test_newReaderTarget(t *testing.T) { + tests := []struct { + name string + in io.Reader + cfg scrape.Config + want []line + wantErr bool + }{ + { + "no newlines", + bytes.NewReader([]byte("bar")), + scrape.Config{}, + []line{ + {model.LabelSet{}, "bar"}, + }, + false, + }, + { + "empty", + bytes.NewReader([]byte("")), + scrape.Config{}, + nil, + false, + }, + { + "newlines", + bytes.NewReader([]byte("\nfoo\r\nbar")), + scrape.Config{}, + []line{ + {model.LabelSet{}, "foo"}, + {model.LabelSet{}, "bar"}, + }, + false, + }, + { + "pipeline", + bytes.NewReader([]byte("\nfoo\r\nbar")), + scrape.Config{ + PipelineStages: loadConfig(stagesConfig), + }, + []line{ + {model.LabelSet{"new_key": "hello world!"}, "foo"}, + {model.LabelSet{"new_key": "hello world!"}, "bar"}, + }, + false, + }, + { + "default config", + bytes.NewReader([]byte("\nfoo\r\nbar")), + defaultStdInCfg, + []line{ + {model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, "foo"}, + {model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, "bar"}, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + recorder := &clientRecorder{} + got, err := newReaderTarget(tt.in, recorder, tt.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("newReaderTarget() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + <-got.ctx.Done() + require.Equal(t, tt.want, recorder.recorded) + }) + } +} + +type mockShutdownable struct { + called chan bool +} + +func (m *mockShutdownable) Shutdown() { + m.called <- true +} + +type fakeStdin struct { + io.Reader + os.FileInfo +} + +func newFakeStin(data string) *fakeStdin { + return &fakeStdin{ + Reader: strings.NewReader(data), + } +} + +func (f fakeStdin) Stat() (os.FileInfo, error) { return f.FileInfo, nil } + +func Test_Shutdown(t *testing.T) { + stdIn = newFakeStin("line") + appMock := &mockShutdownable{called: make(chan bool, 1)} + recorder := &clientRecorder{} + manager, err := newStdinTargetManager(appMock, recorder, []scrape.Config{{}}) + require.NoError(t, err) + require.NotNil(t, manager) + called := <-appMock.called + require.Equal(t, true, called) + require.Equal(t, []line{{labels: model.LabelSet{}, entry: "line"}}, recorder.recorded) +} + +func Test_StdinConfigs(t *testing.T) { + + // should take the first config + require.Equal(t, scrape.DefaultScrapeConfig, getStdinConfig([]scrape.Config{ + scrape.DefaultScrapeConfig, + {}, + })) + // or use the default if none if provided + require.Equal(t, defaultStdInCfg, getStdinConfig([]scrape.Config{})) +} + +type mockFileInfo struct{} + +func (mockFileInfo) Name() string { return "" } +func (mockFileInfo) Size() int64 { return 1 } +func (mockFileInfo) Mode() os.FileMode { return 1 } +func (mockFileInfo) ModTime() time.Time { return time.Now() } +func (mockFileInfo) Sys() interface{} { return nil } +func (mockFileInfo) IsDir() bool { return false } + +func Test_isPipe(t *testing.T) { + fake := newFakeStin("line") + fake.FileInfo = &mockFileInfo{} + stdIn = fake + require.Equal(t, true, isStdinPipe()) + stdIn = os.Stdin + require.Equal(t, false, isStdinPipe()) +} + +var stagesConfig = ` +pipeline_stages: +- template: + source: new_key + template: 'hello world!' +- labels: + new_key: +` + +func loadConfig(yml string) stages.PipelineStages { + var config map[string]interface{} + err := yaml.Unmarshal([]byte(yml), &config) + if err != nil { + panic(err) + } + return config["pipeline_stages"].([]interface{}) +}