Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Filebeat modules pipelines on -setup #3394

Merged
merged 2 commits into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
Expand All @@ -18,7 +19,10 @@ import (
"github.com/elastic/beats/filebeat/spooler"
)

var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
setup = flag.Bool("setup", false, "Run the setup phase for the modules")
)

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
Expand Down Expand Up @@ -67,11 +71,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return fb, nil
}

// Setup is called on user request (the -setup flag) to do the initial Beat setup.
func (fb *Filebeat) Setup(b *beat.Beat) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we have a Setup step in the 1.0 beats interface? Old days :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah :). I didn't add it yet into the interface but I have a feeling that's where it will eventually get there.

esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
return fmt.Errorf("Setup requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return fmt.Errorf("Error creating ES client: %v", err)
}
defer esClient.Close()

return fb.moduleRegistry.Setup(esClient)
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

if *setup {
err = fb.Setup(b)
if err != nil {
return err
}
}

waitFinished := newSignalWait()
waitEvents := newSignalWait()

Expand Down
171 changes: 0 additions & 171 deletions filebeat/filebeat.py

This file was deleted.

27 changes: 26 additions & 1 deletion filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package fileset

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -242,7 +243,31 @@ func (fs *Fileset) getPipelineID() (string, error) {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

return fmt.Sprintf("%s-%s-%s", fs.mcfg.Module, fs.name, removeExt(filepath.Base(path))), nil
return formatPipelineID(fs.mcfg.Module, fs.name, path), nil
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
return formatPipelineID(fs.mcfg.Module, fs.name, path), content, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path string) string {
return fmt.Sprintf("%s-%s-%s", module, fileset, removeExt(filepath.Base(path)))
}

// removeExt returns the file name without the extension. If no dot is found,
Expand Down
17 changes: 15 additions & 2 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !integration

package fileset

import (
Expand Down Expand Up @@ -176,7 +178,18 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) {
assert.True(t, cfg.HasField("paths"))
assert.True(t, cfg.HasField("exclude_files"))
assert.True(t, cfg.HasField("close_eof"))
pipeline_id := fs.vars["beat"].(map[string]interface{})["pipeline_id"]
assert.Equal(t, "nginx-access-with_plugins", pipeline_id)
pipelineID := fs.vars["beat"].(map[string]interface{})["pipeline_id"]
assert.Equal(t, "nginx-access-with_plugins", pipelineID)

}

func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read())

pipelineID, content, err := fs.GetPipeline()
assert.NoError(t, err)
assert.Equal(t, "nginx-access-with_plugins", pipelineID)
assert.Contains(t, content, "description")
assert.Contains(t, content, "processors")
}
33 changes: 33 additions & 0 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,36 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
}
return result, nil
}

// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) error
}

// Setup is called on -setup and loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) Setup(esClient PipelineLoader) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
pipelineID, content, err := fileset.GetPipeline()
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}
}
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}) error {
path := "/_ingest/pipeline/" + pipelineID
err := esClient.LoadJSON(path, content)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}
58 changes: 58 additions & 0 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// +build integration

package fileset

import (
"path/filepath"
"testing"

"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/stretchr/testify/assert"
)

func TestLoadPipeline(t *testing.T) {
client := elasticsearch.GetTestingElasticsearch()
client.Request("DELETE", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)

content := map[string]interface{}{
"description": "describe pipeline",
"processors": []map[string]interface{}{
{
"set": map[string]interface{}{
"field": "foo",
"value": "bar",
},
},
},
}

err := loadPipeline(client, "my-pipeline-id", content)
assert.NoError(t, err)

status, _, _ := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
assert.Equal(t, 200, status)
}

func TestSetupNginx(t *testing.T) {
client := elasticsearch.GetTestingElasticsearch()
client.Request("DELETE", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil)

modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)

configs := []ModuleConfig{
ModuleConfig{Module: "nginx"},
}

reg, err := newModuleRegistry(modulesPath, configs, nil)
assert.NoError(t, err)

err = reg.Setup(client)
assert.NoError(t, err)

status, _, _ := client.Request("GET", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil)
assert.Equal(t, 200, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil)
assert.Equal(t, 200, status)
}
2 changes: 2 additions & 0 deletions filebeat/fileset/modules_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !integration

package fileset

import (
Expand Down
Loading