Skip to content

Commit

Permalink
[Elastic Log Driver] Add new initialization functions and common fiel…
Browse files Browse the repository at this point in the history
…ds (elastic#15786)

* add new initialization functions and other things for ECS and common fields

* clean up error handling
  • Loading branch information
fearful-symmetry authored Jan 24, 2020
1 parent 3b22afd commit 05c6121
Showing 1 changed file with 137 additions and 14 deletions.
151 changes: 137 additions & 14 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@ package pipelinemanager

import (
"crypto/sha1"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"time"

"github.com/gofrs/uuid"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cloudid"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/version"
)

// makeConfigHash is the helper function that turns a user config into a hash
Expand All @@ -47,18 +52,6 @@ func makeConfigHash(cfg map[string]string) string {
// load pipeline starts up a new pipeline with the given config
func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Logger) (*Pipeline, error) {

hostname, err := os.Hostname()
if err != nil {
return nil, errors.Wrap(err, "error getting hostname")
}

info := beat.Info{
Beat: "elastic-logging-plugin",
Version: "0",
Name: name,
Hostname: hostname,
}

newCfg, err := parseCfgKeys(logOptsConfig)
if err != nil {
return nil, errors.Wrap(err, "error parsing config keys")
Expand All @@ -81,7 +74,12 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log
return nil, fmt.Errorf("unpacking config failed: %v", err)
}

processing, err := processing.MakeDefaultSupport(false)(info, log, cfg)
info, err := getBeatInfo(cfg)
if err != nil {
return nil, err
}

processing, err := processing.MakeDefaultBeatSupport(true)(info, log, cfg)
if err != nil {
return nil, errors.Wrap(err, "error in MakeDefaultSupport")
}
Expand Down Expand Up @@ -128,7 +126,6 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log

// parseCfgKeys helpfully parses the values in the map, so users can specify yml structures.
func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) {

outMap := make(map[string]interface{})

for cfgKey, strVal := range cfg {
Expand All @@ -141,3 +138,129 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) {

return outMap, nil
}

// getBeatInfo returns the beat.Info type needed to start the pipeline
func getBeatInfo(cfg *common.Config) (beat.Info, error) {
vers := version.GetDefaultVersion()
hostname, err := os.Hostname()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error getting hostname")
}
eid, err := uuid.NewV4()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID")
}

type nameStr struct {
Name string `config:"name"`
}
name := nameStr{}
err = cfg.Unpack(&name)
if err != nil {
return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err)
}

if name.Name == "" {
name.Name = "elastic-log-driver-" + hostname
}
id, err := loadMeta("/tmp/meta.json")
if err != nil {
return beat.Info{}, errors.Wrap(err, "error loading UUID")
}

info := beat.Info{
Beat: "elastic-logging-plugin",
Name: name.Name,
Hostname: hostname,
Version: vers,
EphemeralID: eid,
ID: id,
}

return info, nil

}

// loadMeta loads the metadata file that contains the UUID
func loadMeta(metaPath string) (uuid.UUID, error) {
type meta struct {
UUID uuid.UUID `json:"uuid"`
}
// check for an existing file
f, err := openRegular(metaPath)
if err != nil && !os.IsNotExist(err) {
return uuid.Nil, errors.Wrapf(err, "beat meta file %s failed to open", metaPath)
}

//return the UUID if it exists
if err == nil {
m := meta{}
if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF {
f.Close()
return uuid.Nil, errors.Wrapf(err, "Error reading %s", metaPath)
}

f.Close()
if m.UUID != uuid.Nil {
return m.UUID, nil
}
}

// file does not exist or ID is invalid, let's create a new one
newID, err := uuid.NewV4()
if err != nil {
return uuid.Nil, errors.Wrap(err, "error creating ID")
}
// write temporary file first
tempFile := metaPath + ".new"
f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return uuid.Nil, errors.Wrapf(err, "failed to create Beat meta file at %s", tempFile)
}

encodeErr := json.NewEncoder(f).Encode(meta{UUID: newID})
err = f.Sync()
if err != nil {
return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to write", tempFile)
}

err = f.Close()
if err != nil {
return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to close", tempFile)
}

if encodeErr != nil {
return uuid.Nil, errors.Wrapf(err, "beat meta file at %s failed to write", tempFile)
}

// move temporary file into final location
err = file.SafeFileRotate(metaPath, tempFile)
if err != nil {
return uuid.Nil, errors.Wrapf(err, "error rotating file to %s", metaPath)
}
return newID, nil
}

// openRegular is a wrapper to handle a file based on a path
func openRegular(filename string) (*os.File, error) {
f, err := os.Open(filename)
if err != nil {
return f, errors.Wrapf(err, "error opening file %s", filename)
}

info, err := f.Stat()
if err != nil {
f.Close()
return nil, errors.Wrapf(err, "error statting %s", filename)
}

if !info.Mode().IsRegular() {
f.Close()
if info.IsDir() {
return nil, fmt.Errorf("%s is a directory", filename)
}
return nil, fmt.Errorf("%s is not a regular file", filename)
}

return f, nil
}

0 comments on commit 05c6121

Please sign in to comment.