Skip to content

Commit

Permalink
Pipe data to Promtail (#1649)
Browse files Browse the repository at this point in the history
* Adds readerTarget that read from io.Reader

Signed-off-by: Cyril Tovena <[email protected]>

* Updates comment and function names.

Signed-off-by: Cyril Tovena <[email protected]>

* Adds stdin manager

Signed-off-by: Cyril Tovena <[email protected]>

* Add more tests.

Signed-off-by: Cyril Tovena <[email protected]>

* Adds default config with labels and support for static configs labels.

Signed-off-by: Cyril Tovena <[email protected]>

* Adds a test with pipeline stages.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes race on shutdown and adds documentation

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes deadlock on server.run

Signed-off-by: Cyril Tovena <[email protected]>

* Update docs/clients/promtail/troubleshooting.md

Co-Authored-By: Owen Diehl <[email protected]>

* gofmt

Signed-off-by: Cyril Tovena <[email protected]>

* gofmt

Signed-off-by: Cyril Tovena <[email protected]>

* unlock mutex if Promtail.Run is called when Promtail is stopped

Co-authored-by: Owen Diehl <[email protected]>
Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
3 people authored Feb 11, 2020
1 parent 0c08e3e commit 814cc87
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 22 deletions.
3 changes: 1 addition & 2 deletions cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ func main() {
}

level.Info(util.Logger).Log("msg", "Starting Promtail", "version", version.Info())
defer p.Shutdown()

if err := p.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error starting promtail", "error", err)
os.Exit(1)
}

p.Shutdown()
}
47 changes: 47 additions & 0 deletions docs/clients/promtail/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,53 @@
This document describes known failure modes of `promtail` on edge cases and the
adopted trade-offs.

## Pipe data to Promtail

Promtail supports piping data for sending logs to Loki. This is a very useful way to troubleshooting your configuration.
Once you have promtail installed you can for instance use the following command to send logs to a local Loki instance:

```bash
cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push
```

You can also add additional labels from command line using:

```bash
cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push --client.external-labels=k1=v1,k2=v2
```

This will add labels `k1` and `k2` with respective values `v1` and `v2`.

In pipe mode Promtail also support file configuration using `--config.file`, however do note that positions config is not used and
only **the first scrape config is used**.

[`static_configs:`](./configuration) can be used to provide static labels, although the targets property is ignored.

If you don't provide any [`scrape_config:`](./configuration#scrape_config) a default one is used which will automatically adds the following default labels: `{job="stdin",hostname="<detected_hostname>"}`.

For example you could use this config below to parse and add the label `level` on all your piped logs:

```yaml
clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: system
pipeline_stages:
- regex:
expression: '(level|lvl|severity)=(?P<level>\\w+)'
- labels:
level:
static_configs:
- labels:
job: my-stdin-logs
```
```
cat my.log | promtail --config.file promtail.yaml
```


## A tailed file is truncated while `promtail` is not running

Given the following order of events:
Expand Down
47 changes: 29 additions & 18 deletions pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package promtail

import (
"sync"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/config"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/server"
"github.com/grafana/loki/pkg/promtail/targets"
)

// Promtail is the root struct for Promtail...
type Promtail struct {
client client.Client
positions *positions.Positions
targetManagers *targets.TargetManagers
server *server.Server

stopped bool
mtx sync.Mutex
}

// New makes a new Promtail.
func New(cfg config.Config) (*Promtail, error) {
positions, err := positions.New(util.Logger, cfg.PositionsConfig)
if err != nil {
return nil, err
}

if cfg.ClientConfig.URL.URL != nil {
// if a single client config is used we add it to the multiple client config for backward compatibility
Expand All @@ -35,33 +34,45 @@ func New(cfg config.Config) (*Promtail, error) {
return nil, err
}

tms, err := targets.NewTargetManagers(util.Logger, positions, client, cfg.ScrapeConfig, &cfg.TargetConfig)
promtail := &Promtail{
client: client,
}

tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
}

promtail.targetManagers = tms
server, err := server.New(cfg.ServerConfig, tms)
if err != nil {
return nil, err
}

return &Promtail{
client: client,
positions: positions,
targetManagers: tms,
server: server,
}, nil
promtail.server = server
return promtail, nil
}

// Run the promtail; will block until a signal is received.
func (p *Promtail) Run() error {
p.mtx.Lock()
// if we stopped promtail before the server even started we can return without starting.
if p.stopped {
p.mtx.Unlock()
return nil
}
p.mtx.Unlock() // unlock before blocking
return p.server.Run()
}

// Shutdown the promtail.
func (p *Promtail) Shutdown() {
p.server.Shutdown()
p.targetManagers.Stop()
p.positions.Stop()
p.mtx.Lock()
defer p.mtx.Unlock()
p.stopped = true
if p.server != nil {
p.server.Shutdown()
}
if p.targetManagers != nil {
p.targetManagers.Stop()
}
p.client.Stop()
}
27 changes: 25 additions & 2 deletions pkg/promtail/targets/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package targets

import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"

Expand All @@ -19,12 +20,14 @@ type targetManager interface {
// TargetManagers manages a list of target managers.
type TargetManagers struct {
targetManagers []targetManager
positions *positions.Positions
}

// NewTargetManagers makes a new TargetManagers
func NewTargetManagers(
app Shutdownable,
logger log.Logger,
positions *positions.Positions,
positionsConfig positions.Config,
client api.EntryHandler,
scrapeConfigs []scrape.Config,
targetConfig *Config,
Expand All @@ -34,6 +37,20 @@ func NewTargetManagers(
var journalScrapeConfigs []scrape.Config
var syslogScrapeConfigs []scrape.Config

if isStdinPipe() {
stdin, err := newStdinTargetManager(app, client, scrapeConfigs)
if err != nil {
return nil, err
}
targetManagers = append(targetManagers, stdin)
return &TargetManagers{targetManagers: targetManagers}, nil
}

positions, err := positions.New(util.Logger, positionsConfig)
if err != nil {
return nil, err
}

for _, cfg := range scrapeConfigs {
if cfg.HasServiceDiscoveryConfig() {
fileScrapeConfigs = append(fileScrapeConfigs, cfg)
Expand Down Expand Up @@ -84,7 +101,10 @@ func NewTargetManagers(
targetManagers = append(targetManagers, syslogTargetManager)
}

return &TargetManagers{targetManagers: targetManagers}, nil
return &TargetManagers{
targetManagers: targetManagers,
positions: positions,
}, nil

}

Expand Down Expand Up @@ -125,4 +145,7 @@ func (tm *TargetManagers) Stop() {
for _, t := range tm.targetManagers {
t.Stop()
}
if tm.positions != nil {
tm.positions.Stop()
}
}
168 changes: 168 additions & 0 deletions pkg/promtail/targets/stdin_target_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package targets

import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/scrape"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

// bufferSize is the size of the buffered reader
const bufferSize = 8096

// file is an interface allowing us to abstract a file.
type file interface {
Stat() (os.FileInfo, error)
io.Reader
}

var (
// stdIn is os.Stdin but can be replaced for testing purpose.
stdIn file = os.Stdin
hostName, _ = os.Hostname()
// defaultStdInCfg is the default config for stdin target if none provided.
defaultStdInCfg = scrape.Config{
JobName: "stdin",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{
{Labels: model.LabelSet{"job": "stdin"}},
{Labels: model.LabelSet{"hostname": model.LabelValue(hostName)}},
},
},
}
)

func isStdinPipe() bool {
info, err := stdIn.Stat()
if err != nil {
level.Warn(util.Logger).Log("err", err)
return false
}
m := info.Mode()
if m&os.ModeCharDevice != 0 || info.Size() <= 0 {
return false
}
return true
}

type Shutdownable interface {
Shutdown()
}

type stdinTargetManager struct {
*readerTarget
app Shutdownable
}

func newStdinTargetManager(app Shutdownable, client api.EntryHandler, configs []scrape.Config) (*stdinTargetManager, error) {
reader, err := newReaderTarget(stdIn, client, getStdinConfig(configs))
if err != nil {
return nil, err
}
stdinManager := &stdinTargetManager{
readerTarget: reader,
app: app,
}
// when we're done flushing our stdin we can shutdown the app.
go func() {
<-reader.ctx.Done()
app.Shutdown()
}()
return stdinManager, nil
}

func getStdinConfig(configs []scrape.Config) scrape.Config {
cfg := defaultStdInCfg
// if we receive configs we use the first one.
if len(configs) > 0 {
if len(configs) > 1 {
level.Warn(util.Logger).Log("msg", fmt.Sprintf("too many scrape configs, skipping %d configs.", len(configs)-1))
}
cfg = configs[0]
}
return cfg
}

func (t *stdinTargetManager) Ready() bool {
return t.ctx.Err() == nil
}
func (t *stdinTargetManager) Stop() { t.cancel() }
func (t *stdinTargetManager) ActiveTargets() map[string][]Target { return nil }
func (t *stdinTargetManager) AllTargets() map[string][]Target { return nil }

type readerTarget struct {
in *bufio.Reader
out api.EntryHandler
lbs model.LabelSet
logger log.Logger

cancel context.CancelFunc
ctx context.Context
}

func newReaderTarget(in io.Reader, client api.EntryHandler, cfg scrape.Config) (*readerTarget, error) {
pipeline, err := stages.NewPipeline(log.With(util.Logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
lbs := model.LabelSet{}
for _, static := range cfg.ServiceDiscoveryConfig.StaticConfigs {
if static != nil && static.Labels != nil {
lbs = lbs.Merge(static.Labels)
}
}
ctx, cancel := context.WithCancel(context.Background())
t := &readerTarget{
in: bufio.NewReaderSize(in, bufferSize),
out: pipeline.Wrap(client),
cancel: cancel,
ctx: ctx,
lbs: lbs,
logger: log.With(util.Logger, "component", "reader"),
}
go t.read()

return t, nil
}

func (t *readerTarget) read() {
defer t.cancel()

for {
if t.ctx.Err() != nil {
return
}
line, err := t.in.ReadString('\n')
if err != nil && err != io.EOF {
level.Warn(t.logger).Log("msg", "error reading buffer", "err", err)
return
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
if err == io.EOF {
return
}
continue
}
if err := t.out.Handle(t.lbs, time.Now(), line); err != nil {
level.Error(t.logger).Log("msg", "error sending line", "err", err)
}
if err == io.EOF {
return
}
}
}
Loading

0 comments on commit 814cc87

Please sign in to comment.