Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: Add systemd journal support #730

Merged
merged 9 commits into from
Jul 15, 2019
18 changes: 16 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@
[[override]]
name = "k8s.io/client-go"
revision = "1a26190bd76a9017e289958b9fba936430aa3704"

[[constraint]]
name = "github.com/coreos/go-systemd"
version = "19.0.0"
rfratto marked this conversation as resolved.
Show resolved Hide resolved
79 changes: 76 additions & 3 deletions docs/promtail-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This example of config promtail based on original docker [config](https://github
and show how work with 2 and more sources:

Filename for example: my-docker-config.yaml
```
```yaml
server:
http_listen_port: 9080
grpc_listen_port: 0
Expand Down Expand Up @@ -45,6 +45,7 @@ scrape_configs:
__path__: /srv/log/someone_service/*.log

```

#### Description

Scrape_config section of config.yaml contents contains various jobs for parsing your logs
Expand All @@ -54,15 +55,87 @@ Scrape_config section of config.yaml contents contains various jobs for parsing
`__path__` it is path to directory where stored your logs.

If you run promtail and this config.yaml in Docker container, don't forget use docker volumes for mapping real directories
with log to those folders in the container.
with log to those folders in the container.

#### Example Use
1) Create folder, for example `promtail`, then new sub directory `build/conf` and place there `my-docker-config.yaml`.
2) Create new Dockerfile in root folder `promtail`, with contents
```
```dockerfile
FROM grafana/promtail:latest
COPY build/conf /etc/promtail
```
3) Create your Docker image based on original Promtail image and tag it, for example `mypromtail-image`
3) After that you can run Docker container by this command:
`docker run -d --name promtail --network loki_network -p 9080:9080 -v /var/log:/var/log -v /srv/log/someone_service:/srv/log/someone_service mypromtail-image -config.file=/etc/promtail/my-docker-config.yaml`

## Simple Systemd Journal Config

This example demonstrates how to configure promtail to listen to systemd journal
entries and write them to Loki:

Filename for example: my-systemd-journal-config.yaml

```yaml
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://ip_or_hostname_where_loki_runns:3100/api/prom/push

scrape_configs:
- job_name: journal
journal:
since: 0
path: /var/log/journal
labels:
job: systemd-journal
relabel_configs:
- source_labels: ['__journal__systemd_unit']
target_label: 'unit'
```

### Description

Just like the Docker example, the `scrape_configs` sections holds various
jobs for parsing logs. A job with a `journal` key configures it for systemd
journal reading.

`since` is an optional unsigned integer value determining the earliest
log that should be read. If nonzero, the integer is read as a nanosecond
offset from the current system time when the reader is initialized. Zero
is the default value, indicating that all journal entries should be read.

`path` is an optional string specifying the path to read journal entries
from. If unspecified, defaults to the system default (`/var/log/journal`).

`labels`: is a map of string values specifying labels that should always
be associated with each log entry being read from the systemd journal.
In our example, each log will have a label of `job=systemd-journal`.

Every field written to the systemd journal is available for processing
in the `relabel_configs` section. Label names are converted to lowercase
and prefixed with `__journal_`. After `relabel_configs` processes all
labels for a job entry, any label starting with `__` is deleted.

Our example renames the `_SYSTEMD_UNIT` label (available as
`__journal__systemd_unit` in promtail) to `unit` so it will be available
in Loki. All other labels from the journal entry are dropped.

### Example Use

`promtail` must have access to both the journald socket
(`/var/run/systemd/journal/socket`) and the journal path (`/var/log/journal`)
for journal support to work correctly.

If running with Docker, that means to bind those two paths:

```bash
docker run -d --name promtail --network loki_network -p 9080:9080 \
-v /var/log/journal:/var/log/journal \
-v /var/run/systemd/journal/socket:/var/run/systemd/journal/socket \
mypromtail-image -config.file=/etc/promtail/my-systemd-journal-config.yaml
```
20 changes: 20 additions & 0 deletions pkg/promtail/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package scrape

import (
"fmt"
"time"

"github.com/prometheus/common/model"

sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/pkg/relabel"
Expand All @@ -15,10 +18,27 @@ type Config struct {
JobName string `yaml:"job_name,omitempty"`
EntryParser api.EntryParser `yaml:"entry_parser"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
}

// JournalTargetConfig describes systemd journal records to scrape.
type JournalTargetConfig struct {
// Since holds the time after time.Now() to start retrieving records
rfratto marked this conversation as resolved.
Show resolved Hide resolved
// for. If 0, records will be retrieved from the earliest entry in
// the journal.
Since time.Duration `yaml:"since,omitempty"`

// Labels optionally holds labels to associate with each record coming out
// of the journal.
Labels model.LabelSet `yaml:"labels"`

// Path to a directory to read journal entries from. Defaults to system path
// if empty.
Path string `yaml:"path"`
rfratto marked this conversation as resolved.
Show resolved Hide resolved
}

// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
EntryParser: api.Docker,
Expand Down
182 changes: 182 additions & 0 deletions pkg/promtail/targets/journaltarget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package targets

import (
"fmt"
"io/ioutil"
"strings"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"

"github.com/go-kit/kit/log/level"

"github.com/grafana/loki/pkg/promtail/positions"

"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/promtail/scrape"

"github.com/coreos/go-systemd/sdjournal"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
// journalEmptyStr is represented as a single-character space because
// returning an empty string from sdjournal.JournalReaderConfig's
// Formatter causes an immediate EOF and induces performance issues
// with how that is handled in sdjournal.
journalEmptyStr = " "
)

// JournalTarget tails systemd journal entries.
type JournalTarget struct {
logger log.Logger
handler api.EntryHandler
positions *positions.Positions
relabelConfig []*relabel.Config
config *scrape.JournalTargetConfig
labels model.LabelSet

r *sdjournal.JournalReader
until chan time.Time
}

// NewJournalTarget configures a new JournalTarget.
func NewJournalTarget(
logger log.Logger,
handler api.EntryHandler,
positions *positions.Positions,
relabelConfig []*relabel.Config,
targetConfig *scrape.JournalTargetConfig,
) (*JournalTarget, error) {

until := make(chan time.Time)
t := &JournalTarget{
logger: logger,
handler: handler,
positions: positions,
relabelConfig: relabelConfig,
labels: targetConfig.Labels,
config: targetConfig,

until: until,
}

var err error
t.r, err = sdjournal.NewJournalReader(sdjournal.JournalReaderConfig{
Since: targetConfig.Since,
Formatter: t.formatter,
})
if err != nil {
return nil, errors.Wrap(err, "creating journal reader")
}

go func() {
err := t.r.Follow(until, ioutil.Discard)
if err != sdjournal.ErrExpired {
level.Error(t.logger).Log("msg", "received error during sdjournal follow", "err", err.Error())
}
}()

return t, nil
}

func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
msg, ok := entry.Fields["MESSAGE"]
if !ok {
level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field")
return journalEmptyStr, nil
}
entryLabels := makeJournalFields(entry.Fields)

// Add constant labels
for k, v := range t.labels {
entryLabels[string(k)] = string(v)
}

processedLabels := relabel.Process(labels.FromMap(entryLabels), t.relabelConfig...)

processedLabelsMap := processedLabels.Map()
labels := make(model.LabelSet, len(processedLabelsMap))
for k, v := range processedLabelsMap {
if k[0:2] == "__" {
continue
}

labels[model.LabelName(k)] = model.LabelValue(v)
}
if len(labels) == 0 {
// No labels, drop journal entry
return journalEmptyStr, nil
}

// TODO(rfratto): positions support? There are two ways to be able to
// uniquely identify the offset from a journal entry: monotonic timestamps
// and the cursor position.
//
// The monotonic timestamp is a tuple of a 128-bit boot ID and a 64-bit
// nanosecond timestamp. The sdjournal library currently does not expose
// the seek functionality for monotonic timestamps.
//
// The cursor position is an arbtirary string identifying the offset
// in a journal. The documentation for systemd declares the cursor
// string as opaque and shouldn't be parsed by users. The sdjournal
// library *does* expose the seek funcationlity using the cursor
// position.
//
// With either solution, the current positions.Positions is unable
// to handle the timestamps as it is only equipped to handle int64 ts.

err := t.handler.Handle(labels, ts, msg)
return journalEmptyStr, err
}

// Type returns JournalTargetType.
func (t *JournalTarget) Type() TargetType {
return JournalTargetType
}

// Ready indicates whether or not the journal is ready to be
// read from.
func (t *JournalTarget) Ready() bool {
return true
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO(rfratto): Perhaps the Target interface should remove DiscoveredLabels
// and Labels and instead have a Dropped method.

// DiscoveredLabels satisfies the Target interface. Returns nil for
// JournalTarget as there are no discovered labels present as a
// JournalTarget processes.
func (t *JournalTarget) DiscoveredLabels() model.LabelSet {
return nil
}

// Labels satisfies the Target interface. Returns nil for JournalTarget
// as there are is no guaranteed constant list of labels present as a
// JournalTarget processes.
func (t *JournalTarget) Labels() model.LabelSet {
return nil
}

// Details returns target-specific details (currently nil).
func (t *JournalTarget) Details() interface{} {
return nil
}

// Stop shuts down the JournalTarget.
func (t *JournalTarget) Stop() error {
t.until <- time.Now()
return t.r.Close()
}

func makeJournalFields(fields map[string]string) map[string]string {
result := make(map[string]string, len(fields))
for k, v := range fields {
result[fmt.Sprintf("__journal_%s", strings.ToLower(k))] = v
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}
return result
}
Loading