Skip to content

Commit

Permalink
Use local timezone for TZ conversion in the FB system module (#5647)
Browse files Browse the repository at this point in the history
* Use local timezone for TZ conversion in the FB system module

This adds a `convert_timezone` fileset parameter that, when enabled,
does two things:

* Uses the `add_locale` processor in the FB proespector config
* Uses `{{ beat.timezone }}` as the `timezone` parameter for the
  date processor in the Ingest Node pipeline. This parameter accepts
  templates starting with ES 6.1.

For the moment the `convert_timezone` flag is off by default, to keep
backwards compatibility and because it results in an error when used
with ES < 6.1.

Closes #3898.

For now this is only applied to the system module, but likely more
modules would benefit from this feature.

* Automatically turn off given options depending on the ES version.

* Added the convert_timezone flag to the auth fileset as well

* Added tests

* Docs & changelog

* Addressed comments
  • Loading branch information
tsg authored and monicasarbu committed Nov 21, 2017
1 parent c5a9f3a commit e125cf1
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Add experimental Docker `json-file` prospector . {pull}5402[5402]
- Add experimental Docker autodiscover functionality. {pull}5245[5245]
- Add option to convert the timestamps to UTC in the system module. {pull}5647[5647]

*Heartbeat*

Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/include/var-convert-timezone.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*`var.convert_timezone`*::

If this option is enabled, Filebeat reads the local timezone and uses it at log
parsing time to convert the timestamp to UTC. The local timezone is also added
in each event in a dedicated field (`beat.timezone`). The conversion is only
possible in Elasticsearch >= 6.1. If the Elasticsearch version is less than 6.1,
the `beat.timezone` field is added, but the conversion to UTC is not made. The
default is `false`.
14 changes: 12 additions & 2 deletions filebeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ image::./images/kibana-system.png[]
include::../include/configuring-intro.asciidoc[]

The following example shows how to set paths in the +modules.d/{modulename}.yml+
file to override the default paths for the syslog and authorization logs:
file to override the default paths for the syslog and authorization logs:

["source","yaml",subs="attributes"]
-----
Expand All @@ -55,7 +55,7 @@ To specify the same settings at the command line, you use:
-----


The command in the example assumes that you have already enabled the +{modulename}+ module.
The command in the example assumes that you have already enabled the +{modulename}+ module.

//set the fileset name used in the included example
:fileset_ex: syslog
Expand All @@ -68,6 +68,16 @@ include::../include/config-option-intro.asciidoc[]

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]




[float]
Expand Down
6 changes: 6 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand All @@ -33,6 +36,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand Down
87 changes: 74 additions & 13 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/template"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
)

Expand Down Expand Up @@ -51,6 +52,11 @@ func New(
}, nil
}

// String returns the module and the name of the fileset.
func (fs *Fileset) String() string {
return fs.mcfg.Module + "/" + fs.name
}

// Read reads the manifest file and evaluates the variables.
func (fs *Fileset) Read(beatVersion string) error {
var err error
Expand Down Expand Up @@ -155,18 +161,57 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {
return vars, nil
}

// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}

haveVersion, err := common.NewVersion(esVersion)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
}

for _, vals := range fs.manifest.Vars {
var ok bool
name, ok := vals["name"].(string)
if !ok {
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
}

minESVersion, ok := vals["min_elasticsearch_version"].(map[string]interface{})
if ok {
minVersion, err := common.NewVersion(minESVersion["version"].(string))
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}

logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion)

if haveVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion)
}
}
}

return retVars, nil
}

// resolveVariable considers the value as a template so it can refer to built-in variables
// as well as other variables defined before them.
func resolveVariable(vars map[string]interface{}, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
return applyTemplate(vars, v)
return applyTemplate(vars, v, false)
case []interface{}:
transformed := []interface{}{}
for _, val := range v {
s, ok := val.(string)
if ok {
transf, err := applyTemplate(vars, s)
transf, err := applyTemplate(vars, s, false)
if err != nil {
return nil, fmt.Errorf("array: %v", err)
}
Expand All @@ -180,9 +225,15 @@ func resolveVariable(vars map[string]interface{}, value interface{}) (interface{
return value, nil
}

// applyTemplate applies a Golang text/template
func applyTemplate(vars map[string]interface{}, templateString string) (string, error) {
tpl, err := template.New("text").Parse(templateString)
// applyTemplate applies a Golang text/template. If specialDelims is set to true,
// the delimiters are set to `{%` and `%}` instead of `{{` and `}}`. These are easier to use
// in pipeline definitions.
func applyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
tpl := template.New("text")
if specialDelims {
tpl = tpl.Delims("{%", "%}")
}
tpl, err := tpl.Parse(templateString)
if err != nil {
return "", fmt.Errorf("Error parsing template %s: %v", templateString, err)
}
Expand Down Expand Up @@ -215,7 +266,7 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) {
}

func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
path, err := applyTemplate(fs.vars, fs.manifest.Prospector)
path, err := applyTemplate(fs.vars, fs.manifest.Prospector, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the prospector path: %v", err)
}
Expand All @@ -224,7 +275,7 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error reading prospector file %s: %v", path, err)
}

yaml, err := applyTemplate(fs.vars, string(contents))
yaml, err := applyTemplate(fs.vars, string(contents), false)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the prospector: %v", err)
}
Expand Down Expand Up @@ -269,27 +320,37 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {

// getPipelineID returns the Ingest Node pipeline ID
func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs.
func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion)
if err != nil {
return "", nil, err
}

jsonString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
Expand Down
46 changes: 45 additions & 1 deletion filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
package fileset

import (
"encoding/json"
"fmt"
"path/filepath"
"runtime"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func getModuleForTesting(t *testing.T, module, fileset string) *Fileset {
Expand Down Expand Up @@ -193,9 +196,50 @@ func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))

pipelineID, content, err := fs.GetPipeline()
pipelineID, content, err := fs.GetPipeline("5.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
assert.Contains(t, content, "description")
assert.Contains(t, content, "processors")
}

func TestGetPipelineConvertTS(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"fileset", "modules"})
}

// load system/syslog
modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)
fs, err := New(modulesPath, "syslog", &ModuleConfig{Module: "system"}, &FilesetConfig{
Var: map[string]interface{}{
"convert_timezone": true,
},
})
assert.NoError(t, err)
assert.NoError(t, fs.Read("6.1.0"))

// ES 6.0.0 should not have beat.timezone referenced
pipelineID, content, err := fs.GetPipeline("6.0.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err := json.Marshal(content)
assert.NoError(t, err)
assert.NotContains(t, string(marshaled), "beat.timezone")

// ES 6.1.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.1.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")

// ES 6.2.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
}
2 changes: 1 addition & 1 deletion filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error {
}
}

pipelineID, content, err := fileset.GetPipeline()
pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
Expand Down
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand All @@ -19,6 +22,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Authorization logs
auth:
enabled: true

# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false
14 changes: 12 additions & 2 deletions filebeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ image::./images/kibana-system.png[]
include::../include/configuring-intro.asciidoc[]

The following example shows how to set paths in the +modules.d/{modulename}.yml+
file to override the default paths for the syslog and authorization logs:
file to override the default paths for the syslog and authorization logs:

["source","yaml",subs="attributes"]
-----
Expand All @@ -50,7 +50,7 @@ To specify the same settings at the command line, you use:
-----


The command in the example assumes that you have already enabled the +{modulename}+ module.
The command in the example assumes that you have already enabled the +{modulename}+ module.

//set the fileset name used in the included example
:fileset_ex: syslog
Expand All @@ -63,3 +63,13 @@ include::../include/config-option-intro.asciidoc[]

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]


4 changes: 4 additions & 0 deletions filebeat/module/system/auth/config/auth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ exclude_files: [".gz$"]
multiline:
pattern: "^\\s"
match: after
{{ if .convert_timezone }}
processors:
- add_locale: ~
{{ end }}
1 change: 1 addition & 0 deletions filebeat/module/system/auth/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss"
],
{% if .convert_timezone %}"timezone": "{{ beat.timezone }}",{% end %}
"ignore_failure": true
}
},
Expand Down
Loading

0 comments on commit e125cf1

Please sign in to comment.