Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][log] Enable status reporter for log input #40075

Merged
merged 24 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ec8ce0b
chore: initial commit, without tests
VihasMakwana Jul 2, 2024
7286a1d
chore: tests
VihasMakwana Jul 2, 2024
cff6785
chore: add test cases
VihasMakwana Jul 4, 2024
bef95bf
fix: add null check
VihasMakwana Jul 4, 2024
ec66f08
fix: remove println
VihasMakwana Jul 4, 2024
facd3a0
fix: lint
VihasMakwana Jul 4, 2024
05dfbac
goimports
VihasMakwana Jul 4, 2024
0f00f04
remove println
VihasMakwana Jul 4, 2024
dbdb6ea
Merge branch 'main' into add-log-status-reporter
VihasMakwana Jul 4, 2024
035619f
fix: changelog
VihasMakwana Jul 5, 2024
99b5625
update test for windows
VihasMakwana Jul 5, 2024
bf26bf3
fix: fix some comments
VihasMakwana Jul 11, 2024
2bf5257
chore: add starting state in NewInput
VihasMakwana Jul 11, 2024
f6b1f15
Merge branch 'main' into add-log-status-reporter
pierrehilbert Jul 11, 2024
ecd36a0
Merge branch 'main' into add-log-status-reporter
VihasMakwana Jul 11, 2024
965c285
fix: add sample output to verify the status
VihasMakwana Jul 11, 2024
616f40e
fix: remove println
VihasMakwana Jul 11, 2024
37bf52c
Merge branch 'main' into add-log-status-reporter
VihasMakwana Jul 12, 2024
687fda7
Merge branch 'main' into add-log-status-reporter
VihasMakwana Jul 12, 2024
9453b09
fix: add integration tag
VihasMakwana Jul 12, 2024
b7b7e1f
Merge branch 'main' into add-log-status-reporter
VihasMakwana Jul 15, 2024
142ff3e
Update CHANGELOG.next.asciidoc
VihasMakwana Jul 16, 2024
e027831
fix: remove redundant bool
VihasMakwana Jul 16, 2024
25e6488
fix: add degraded
VihasMakwana Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]

Expand Down
31 changes: 21 additions & 10 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -48,12 +49,13 @@ type Input interface {

// Runner encapsulate the lifecycle of the input
type Runner struct {
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
statusReporter status.StatusReporter
rdner marked this conversation as resolved.
Show resolved Hide resolved
}

// New instantiates a new Runner
Expand Down Expand Up @@ -83,10 +85,11 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
GetStatusReporter: input.GetStatusReporter,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down Expand Up @@ -164,3 +167,11 @@ func (p *Runner) stop() {
func (p *Runner) String() string {
return fmt.Sprintf("input [type=%s]", p.config.Type)
}

func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) {
p.statusReporter = statusReporter
}

func (p *Runner) GetStatusReporter() status.StatusReporter {
return p.statusReporter
}
rdner marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 20 additions & 0 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Input struct {
meta map[string]string
stopOnce sync.Once
fileStateIdentifier file.StateIdentifier
getStatusReporter input.GetStatusReporter
rdner marked this conversation as resolved.
Show resolved Hide resolved
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -157,8 +159,11 @@ func NewInput(
done: context.Done,
meta: meta,
fileStateIdentifier: identifier,
getStatusReporter: context.GetStatusReporter,
}

p.updateStatus(status.Starting, "starting the log input")

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
_, err = p.createHarvester(logger, file.State{}, nil)
Expand Down Expand Up @@ -224,6 +229,9 @@ func (p *Input) loadStates(states []file.State) error {

// Run runs the input
func (p *Input) Run() {
// Mark it Running for now.
// Any errors encountered in this loop will change state to degraded
p.updateStatus(status.Running, "")
logger := p.logger
logger.Debug("Start next scan")

Expand Down Expand Up @@ -558,6 +566,7 @@ func (p *Input) scan() {
continue
}
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err))
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
rdner marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(harvesterErrMsg, newState.Source, err)
}
} else {
Expand All @@ -583,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, oldState.Offset)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
Expand All @@ -593,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, 0)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

Expand Down Expand Up @@ -833,3 +844,12 @@ func (p *Input) stopWhenDone() {

p.Wait()
}

func (p *Input) updateStatus(status status.Status, msg string) {
if p.getStatusReporter == nil {
return
}
if reporter := p.getStatusReporter(); reporter != nil {
reporter.UpdateStatus(status, msg)
}
}
12 changes: 8 additions & 4 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type GetStatusReporter func() status.StatusReporter

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
GetStatusReporter GetStatusReporter
rdner marked this conversation as resolved.
Show resolved Hide resolved
}

// Factory is used to register functions creating new Input instances.
Expand Down
205 changes: 205 additions & 0 deletions x-pack/filebeat/tests/integration/status_reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration

import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/x-pack/filebeat/cmd"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
"github.com/elastic/beats/v7/x-pack/libbeat/management/tests"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestLogStatusReporter(t *testing.T) {
unitOneID := mock.NewID()
unitOutID := mock.NewID()
token := mock.NewID()

tests.InitBeatsForTest(t, cmd.Filebeat())
tmpDir := t.TempDir()
filename := fmt.Sprintf("test-%d", time.Now().Unix())
outPath := filepath.Join(tmpDir, filename)
t.Logf("writing output to file %s", outPath)
err := os.Mkdir(outPath, 0775)
require.NoError(t, err)

/*
* valid input stream, shouldn't raise any error.
*/
inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*.log"), 2)
require.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test.log"), []byte("Line1\nLine2\nLine3\n"), 0777))
/*
* try to open an irregular file.
* This should throw "Tried to open non regular file:" and status to degraded
*/
nullDeviceFile := "/dev/null"
if runtime.GOOS == "windows" {
nullDeviceFile = "NUL"
}
inputStreamIrregular := getInputStream(unitOneID, nullDeviceFile, 1)

outputExpectedStream := proto.UnitExpected{
Id: unitOutID,
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Type: "file",
Source: tests.RequireNewStruct(map[string]interface{}{
"type": "file",
"enabled": true,
"path": outPath,
"filename": "beat-out",
"number_of_files": 7,
}),
},
}

observedStates := make(chan *proto.CheckinObserved)
expectedUnits := make(chan []*proto.UnitExpected)
done := make(chan struct{})
// V2 mock server
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
select {
case observedStates <- observed:
return &proto.CheckinExpected{
Units: <-expectedUnits,
}
case <-done:
return nil
}
},
ActionImpl: func(response *proto.ActionResponse) error {
return nil
},
}
require.NoError(t, server.Start())
defer server.Stop()

// start the client
client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{
Name: "program",
}, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) {
c := management.DefaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits)
})

go func() {
t.Logf("Running beats...")
err := cmd.Filebeat().Execute()
require.NoError(t, err)
}()

scenarios := []struct {
expectedStatus proto.State
nextInputunit *proto.UnitExpected
}{
{
proto.State_HEALTHY,
&inputStreamIrregular,
},
{
proto.State_DEGRADED,
&inputStream,
},
{
proto.State_HEALTHY,
&inputStream,
},
// wait for one more checkin, just to be sure it's healthy
{
proto.State_HEALTHY,
&inputStream,
},
}

timer := time.NewTimer(2 * time.Minute)
id := 0
for id < len(scenarios) {
select {
case observed := <-observedStates:
state := extractState(observed.GetUnits(), unitOneID)
expectedUnits <- []*proto.UnitExpected{
scenarios[id].nextInputunit,
&outputExpectedStream,
}
if state != scenarios[id].expectedStatus {
continue
}
// always ensure that output is healthy
outputState := extractState(observed.GetUnits(), unitOutID)
require.Equal(t, outputState, proto.State_HEALTHY)
Comment on lines +153 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little detail here, the way the code is written, the output status is only checked if the current state is the desired state, was that your intention? The comment made me understand you wanted to check the output status on every iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output status is only checked if the current state is the desired state, was that your intention?

Yes. I wanted to make sure that output is in healthy state.


timer.Reset(2 * time.Minute)
id++
case <-timer.C:
t.Fatal("timeout waiting for checkin")
default:
}
}
require.Eventually(t, func() bool {
events := tests.ReadLogLines(t, outPath)
return events > 0 // wait until we see one output event
}, 15*time.Second, 1*time.Second)
}

func extractState(units []*proto.UnitObserved, idx string) proto.State {
for _, unit := range units {
if unit.Id == idx {
return unit.GetState()
}
}
return -1
}

func getInputStream(id string, path string, stateIdx int) proto.UnitExpected {
return proto.UnitExpected{
Id: id,
Type: proto.UnitType_INPUT,
ConfigStateIdx: uint64(stateIdx),
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Streams: []*proto.Stream{{
Id: "filebeat/log-default-system",
Source: tests.RequireNewStruct(map[string]interface{}{
"enabled": true,
"symlinks": true,
"type": "log",
"paths": []interface{}{path},
"scan_frequency": "500ms",
}),
}},
Type: "log",
Id: "log-input-test",
Name: "log-1",
Revision: 1,
},
}
}
Loading