Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port two Harvester tests of log input to filestream in Golang #24190

Merged
merged 5 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func defaultFileWatcherConfig() fileWatcherConfig {
func (w *fileWatcher) Run(ctx unison.Canceler) {
defer close(w.events)

// run initial scan before starting regular
w.watch(ctx)
kvch marked this conversation as resolved.
Show resolved Hide resolved

ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
kvch marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
144 changes: 144 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package filestream

import (
"context"
"runtime"
"sync"
"testing"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// test_close_renamed from test_harvester.py
func TestFilestreamCloseRenamed(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("renaming files while Filebeat is running is not supported on Windows")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? I wonder if the issue is due to use trying to write to the "original" path after the rotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this error when I ran os.Rename: The process cannot access the file because it is being used by another process.

I think no because the test tried to rename the file after it had written lines to the file. It rather fails because the filestream input keeps the file open as it is waiting for new events to show up. To test my theory I closed the file on EOF, afterwards, I was able to rename the file on Windows as well.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reopening restarting the input, is that detected as a rename? Can we create a test to validate the expected behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone is able to rename a file on Windows while Filebeat is keeping it open, it is detected by the checker goroutine of logReader. So the reader is going to behave correctly.

When reopening restarting the input, is that detected as a rename? Can we create a test to validate the expected behavior?

What is the expected behaviour? If the input is restarted, it does not make sense to check if a file was renamed because close_renamed applies to opened files that are renamed during Filebeat running a Harvester for.

When the input is restarted, it finds the state in the registry for the renamed file, unless path file_identity strategy is selected, it will not start a new Harvester for the file because it is still the same file it encountered during the previous run.

These test cases are not special, we either already have tests for it or going to have them when all tests of log input is ported to use filestream.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still would expect that the meta-data in the registry are updated... meaning that the prospector detects the file was renamed. Maybe it is rather worth an unit test for the prospector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will address this in a separate PR.


env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp, err := (&loginp.InputManager{
Logger: logp.L(),
StateStore: env.stateStore,
Type: pluginName,
Configure: configure,
}).Create(common.MustNewConfigFrom(
map[string]interface{}{
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "1ms",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.renamed": "true",
},
))
kvch marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
t.Fatalf("cannot create filestream input: %+v", err)
}

ctx, cancelInput := context.WithCancel(context.Background())
inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

inp.Run(inputCtx, env.pipeline)
}()

testlines := []byte("first log line\n")
env.mustWriteLinesToFile(testlogName, testlines)

// first event has made it successfully
env.waitUntilEventCount(1)
// check registry
urso marked this conversation as resolved.
Show resolved Hide resolved
env.requireOffsetInRegistry(testlogName, len(testlines))

testlogNameRotated := "test.log.rotated"
env.mustRenameFile(testlogName, testlogNameRotated)

newerTestlines := []byte("new first log line\nnew second log line\n")
env.mustWriteLinesToFile(testlogName, newerTestlines)

// new two events arrived
env.waitUntilEventCount(3)

cancelInput()
wg.Wait()

env.requireOffsetInRegistry(testlogNameRotated, len(testlines))
env.requireOffsetInRegistry(testlogName, len(newerTestlines))
}

// test_close_eof from test_harvester.py
func TestFilestreamCloseEOF(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp, _ := (&loginp.InputManager{
Logger: logp.L(),
StateStore: env.stateStore,
Type: pluginName,
Configure: configure,
}).Create(common.MustNewConfigFrom(
map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.reader.on_eof": "true",
},
))

ctx, cancelInput := context.WithCancel(context.Background())
inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

inp.Run(inputCtx, env.pipeline)
}()
kvch marked this conversation as resolved.
Show resolved Hide resolved

testlines := []byte("first log line\n")
expectedOffset := len(testlines)
env.mustWriteLinesToFile(testlogName, testlines)

// first event has made it successfully
env.waitUntilEventCount(1)
// check registry
urso marked this conversation as resolved.
Show resolved Hide resolved
env.requireOffsetInRegistry(testlogName, expectedOffset)

// the second log line will not be picked up as scan_interval is set to one day.
env.mustWriteLinesToFile(testlogName, []byte("first line\nsecond log line\n"))

// only one event is read
env.waitUntilEventCount(1)

cancelInput()
wg.Wait()

env.requireOffsetInRegistry(testlogName, expectedOffset)
}
219 changes: 219 additions & 0 deletions filebeat/input/filestream/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
)

type inputTestingEnvironment struct {
t *testing.T
workingDir string
stateStore loginp.StateStore
pipeline *mockPipelineConnector
}

type registryEntry struct {
Cursor struct {
Offset int `json:"offset"`
} `json:"cursor"`
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
}
}

func (e *inputTestingEnvironment) mustWriteLinesToFile(filename string, lines []byte) {
path := e.abspath(filename)
err := ioutil.WriteFile(path, lines, 0644)
if err != nil {
e.t.Fatalf("failed to write file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustRenameFile(oldname, newname string) {
err := os.Rename(e.abspath(oldname), e.abspath(newname))
if err != nil {
e.t.Fatalf("failed to rename file '%s': %+v", oldname, err)
}
}

func (e *inputTestingEnvironment) abspath(filename string) string {
return filepath.Join(e.workingDir, filename)
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expectedOffset int) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath})
entry := e.getRegistryState(src.Name())

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) getRegistryState(key string) registryEntry {
inputStore, _ := e.stateStore.Access()

var entry registryEntry
err := inputStore.Get(key, &entry)
if err != nil {
e.t.Fatalf("error when getting expected key '%s' from store: %+v", key, err)
}

return entry
}

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
for {
sum := 0
for _, c := range e.pipeline.clients {
sum += len(c.GetEvents())
}
if sum == count {
return
}
time.Sleep(10 * time.Millisecond)
}
}

type testInputStore struct {
registry *statestore.Registry
}

func openTestStatestore() loginp.StateStore {
return &testInputStore{
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
}
}

func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

func (s *testInputStore) CleanupInterval() time.Duration {
return 24 * time.Hour
}

type mockClient struct {
publishes []beat.Event
ackHandler beat.ACKer
closed bool
mtx sync.Mutex
}

// GetEvents returns the published events
func (c *mockClient) GetEvents() []beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()

return c.publishes
}

// Publish mocks the Client Publish method
func (c *mockClient) Publish(e beat.Event) {
c.PublishAll([]beat.Event{e})
}

// PublishAll mocks the Client PublishAll method
func (c *mockClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

for _, event := range events {
c.publishes = append(c.publishes, event)
c.ackHandler.AddEvent(event, true)
}
c.ackHandler.ACKEvents(len(events))
}

// Close mocks the Client Close method
func (c *mockClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
return fmt.Errorf("mock client already closed")
}

c.closed = true
return nil
}

// mockPipelineConnector mocks the PipelineConnector interface
type mockPipelineConnector struct {
clients []*mockClient
mtx sync.Mutex
}

// GetAllEvents returns all events associated with a pipeline
func (pc *mockPipelineConnector) GetAllEvents() []beat.Event {
var evList []beat.Event
for _, clientEvents := range pc.clients {
evList = append(evList, clientEvents.GetEvents()...)
}

return evList
}

// Connect mocks the PipelineConnector Connect method
func (pc *mockPipelineConnector) Connect() (beat.Client, error) {
return pc.ConnectWith(beat.ClientConfig{})
}

// ConnectWith mocks the PipelineConnector ConnectWith method
func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Client, error) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

c := &mockClient{
ackHandler: config.ACKHandler,
}

pc.clients = append(pc.clients, c)

return c, nil
}