Skip to content

Commit

Permalink
Merge branch '7.x' into backport_24343_7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas authored Mar 6, 2021
2 parents 4db3c6c + 8100dd1 commit c2bfd64
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 24 deletions.
19 changes: 9 additions & 10 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"
)

Expand Down Expand Up @@ -115,16 +116,14 @@ func defaultFileWatcherConfig() fileWatcherConfig {
func (w *fileWatcher) Run(ctx unison.Canceler) {
defer close(w.events)

ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.watch(ctx)
}
}
// run initial scan before starting regular
w.watch(ctx)

timed.Periodic(ctx, w.interval, func() error {
w.watch(ctx)

return nil
})
}

func (w *fileWatcher) watch(ctx unison.Canceler) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- Fix: Successfully installed and enrolled agent running standalone{pull}[24128]24128
- Make installer atomic on windows {pull}[24253]24253
- Fix windows installer during enroll {pull}[24343]24343
- Fix capabilities resolution in inspect command {pull}[24346]24346

==== New features

Expand Down
10 changes: 5 additions & 5 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func buildEnrollArgs(token string, policyID string) ([]string, error) {
args = append(args, "--fleet-server-cert-key", certKey)
}
if envBool("FLEET_SERVER_INSECURE_HTTP") {
args = append(args, "--fleet-server--insecure-http")
args = append(args, "--fleet-server-insecure-http")
args = append(args, "--insecure")
}
} else {
Expand All @@ -235,8 +235,8 @@ func buildEnrollArgs(token string, policyID string) ([]string, error) {

func buildFleetServerConnStr() (string, error) {
host := envWithDefault(defaultESHost, "FLEET_SERVER_ELASTICSEARCH_HOST", "ELASTICSEARCH_HOST")
username := envWithDefault(defaultUsername, "FLEET_SERVER_ELASTICSEARCH_USERNAME", "$ELASTICSEARCH_USERNAME")
password := envWithDefault(defaultPassword, "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "$ELASTICSEARCH_PASSWORD")
username := envWithDefault(defaultUsername, "FLEET_SERVER_ELASTICSEARCH_USERNAME", "ELASTICSEARCH_USERNAME")
password := envWithDefault(defaultPassword, "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "ELASTICSEARCH_PASSWORD")
u, err := url.Parse(host)
if err != nil {
return "", err
Expand Down Expand Up @@ -289,8 +289,8 @@ func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.

func kibanaClient() (*kibana.Client, error) {
host := envWithDefault(defaultKibanaHost, "KIBANA_FLEET_HOST", "KIBANA_HOST")
username := envWithDefault(defaultUsername, "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "$ELASTICSEARCH_USERNAME")
password := envWithDefault(defaultPassword, "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "$ELASTICSEARCH_PASSWORD")
username := envWithDefault(defaultUsername, "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "ELASTICSEARCH_USERNAME")
password := envWithDefault(defaultPassword, "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD")
return kibana.NewClientWithConfig(&kibana.ClientConfig{
Host: host,
Username: username,
Expand Down
20 changes: 16 additions & 4 deletions x-pack/elastic-agent/pkg/capabilities/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,23 @@ func inputsMap(cfgInputs interface{}, l *logger.Logger) []map[string]interface{}

inputsMap := make([]map[string]interface{}, 0, len(inputsSet))
for _, s := range inputsSet {
mm, ok := s.(map[string]interface{})
if !ok {
switch mm := s.(type) {
case map[string]interface{}:
inputsMap = append(inputsMap, mm)
case map[interface{}]interface{}:
newMap := make(map[string]interface{})
for k, v := range mm {
key, ok := k.(string)
if !ok {
continue
}

newMap[key] = v
}
inputsMap = append(inputsMap, newMap)
default:
continue
}
inputsMap = append(inputsMap, mm)
}

return inputsMap
Expand Down Expand Up @@ -188,7 +200,7 @@ func (c *multiInputsCapability) Apply(in interface{}) (interface{}, error) {

inputsMap, err = c.cleanupInput(inputsMap)
if err != nil {
c.log.Errorf("cleaning up config object failed for capability 'multi-outputs': %v", err)
c.log.Errorf("cleaning up config object failed for capability 'multi-inputs': %v", err)
return in, nil
}

Expand Down
39 changes: 35 additions & 4 deletions x-pack/elastic-agent/pkg/capabilities/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,21 @@ func (c *multiOutputsCapability) cleanupOutput(cfgMap map[string]interface{}) (m
return cfgMap, nil
}

outputsMap, ok := outputsIface.(map[string]interface{})
if !ok {
switch outputsMap := outputsIface.(type) {
case map[string]interface{}:
handleOutputMapStr(outputsMap)
cfgMap[outputKey] = outputsMap
case map[interface{}]interface{}:
handleOutputMapIface(outputsMap)
cfgMap[outputKey] = outputsMap
default:
return nil, fmt.Errorf("outputs must be a map")
}

return cfgMap, nil
}

func handleOutputMapStr(outputsMap map[string]interface{}) {
for outputName, outputIface := range outputsMap {
acceptValue := true

Expand All @@ -208,7 +218,28 @@ func (c *multiOutputsCapability) cleanupOutput(cfgMap map[string]interface{}) (m

delete(outputMap, conditionKey)
}
}

cfgMap[outputKey] = outputsMap
return cfgMap, nil
func handleOutputMapIface(outputsMap map[interface{}]interface{}) {
for outputName, outputIface := range outputsMap {
acceptValue := true

outputMap, ok := outputIface.(map[interface{}]interface{})
if ok {
conditionIface, found := outputMap[conditionKey]
if found {
conditionVal, ok := conditionIface.(bool)
if ok {
acceptValue = conditionVal
}
}
}

if !acceptValue {
delete(outputsMap, outputName)
continue
}

delete(outputMap, conditionKey)
}
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/capabilities/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type multiUpgradeCapability struct {
func (c *multiUpgradeCapability) Apply(in interface{}) (interface{}, error) {
upgradeMap := upgradeObject(in)
if upgradeMap == nil {
c.log.Warnf("expecting map config object but got nil for capability 'multi-outputs'")
c.log.Warnf("expecting map config object but got nil for capability 'multi-upgrade'")
// not an upgrade we don't alter origin
return in, nil
}
Expand Down

0 comments on commit c2bfd64

Please sign in to comment.