Skip to content

Commit

Permalink
Merge branch 'master' into move-multiline-to-helper
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirianni committed May 26, 2021
2 parents 3fc4bd5 + 34a85da commit ff98aa9
Show file tree
Hide file tree
Showing 9 changed files with 1,217 additions and 33 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ ARTIFACTS = ${PROJECT_ROOT}/artifacts
ALL_MODULES := $(shell find . -type f -name "go.mod" -exec dirname {} \; | sort )


TOOLS_MOD_DIR := ./internal/tools
.PHONY: install-tools
install-tools:
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
go install github.com/vektra/mockery/cmd/mockery@latest
cd $(TOOLS_MOD_DIR) && go install github.com/golangci/golangci-lint/cmd/golangci-lint
cd $(TOOLS_MOD_DIR) && go install github.com/vektra/mockery/cmd/mockery

.PHONY: test
test: vet test-only
Expand Down Expand Up @@ -44,7 +45,7 @@ listmod:

.PHONY: lint
lint:
golangci-lint run ./...
$$GOPATH/bin/golangci-lint run --timeout 2m0s --allow-parallel-runners ./...

.PHONY: vet
vet: check-missing-modules
Expand Down
15 changes: 15 additions & 0 deletions internal/tools/empty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tools
8 changes: 8 additions & 0 deletions internal/tools/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/bluemedora/bpagent/internal/tools

go 1.16

require (
github.com/golangci/golangci-lint v1.40.1
github.com/vektra/mockery v1.1.2
)
1,062 changes: 1,062 additions & 0 deletions internal/tools/go.sum

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions internal/tools/tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build tools

package tools

// This file follows the recommendation at
// https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module
// on how to pin tooling dependencies to a go.mod file.
// This ensures that all systems use the same version of tools in addition to regular dependencies.

import (
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/vektra/mockery"
)
57 changes: 27 additions & 30 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,7 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}

// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(matches))
for _, path := range matches {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

readers := f.makeReaders(files)
readers := f.makeReaders(matches)
f.firstCheck = false

var wg sync.WaitGroup
Expand All @@ -154,8 +135,8 @@ func (f *InputOperator) poll(ctx context.Context) {
wg.Wait()

// Close all files
for _, file := range files {
file.Close()
for _, reader := range readers {
reader.Close()
}

f.saveCurrent(readers)
Expand Down Expand Up @@ -191,7 +172,26 @@ func getMatches(includes, excludes []string) []string {
// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
func (f *InputOperator) makeReaders(filePaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filePaths))
for _, path := range filePaths {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

// Get fingerprints for each file
fps := make([]*Fingerprint, 0, len(files))
for _, file := range files {
Expand All @@ -203,18 +203,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
fps = append(fps, fp)
}

// Make a copy of the files so we don't modify the original
filesCopy := make([]*os.File, len(files))
copy(filesCopy, files)

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
files[i].Close()
// Empty file, don't read it until we can compare its fingerprint
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)

}

Expand All @@ -228,7 +225,7 @@ OUTER:
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
continue OUTER
}
}
Expand All @@ -237,7 +234,7 @@ OUTER:

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck)
reader, err := f.newReader(files[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
Expand Down
6 changes: 6 additions & 0 deletions operator/builtin/input/file/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const defaultFingerprintSize = 1000 // bytes
const minFingerprintSize = 16 // bytes

// Fingerprint is used to identify a file
// A file's fingerprint is the first N bytes of the file,
// where N is the fingerprintSize on the file_input operator
type Fingerprint struct {
FirstBytes []byte
}
Expand Down Expand Up @@ -42,6 +44,10 @@ func (f Fingerprint) Copy() *Fingerprint {

// StartsWith returns true if the fingerprints are the same
// or if the new fingerprint starts with the old one
// This is important functionality for tracking new files,
// since their initial size is typically less than that of
// a fingerprint. As the file grows, its fingerprint is updated
// until it reaches a maximum size, as configured on the operator
func (f Fingerprint) StartsWith(old *Fingerprint) bool {
l0 := len(old.FirstBytes)
if l0 == 0 {
Expand Down
63 changes: 63 additions & 0 deletions operator/builtin/input/file/fingerprint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package file

import (
"fmt"
"io/ioutil"
"math/rand"
"strings"
"testing"

Expand Down Expand Up @@ -200,4 +202,65 @@ func TestFingerprintStartsWith(t *testing.T) {
}
}

// Generates a file filled with many random bytes, then
// writes the same bytes to a second file, one byte at a time.
// Validates, after each byte is written, that fingerprint
// matching would successfully associate the two files.
// The static file can be thought of as the present state of
// the file, while each iteration of the growing file represents
// a possible state of the same file at a previous time.
func TestFingerprintStartsWith_FromFile(t *testing.T) {
r := rand.New(rand.NewSource(112358))

operator, _, tempDir := newTestFileOperator(t, nil, nil)
operator.fingerprintSize *= 10

fileLength := 12 * operator.fingerprintSize

// Make a []byte we can write one at a time
content := make([]byte, fileLength)
r.Read(content) // Fill slice with random bytes

// Overwrite some bytes with \n to ensure
// we are testing a file with multiple lines
newlineMask := make([]byte, fileLength)
r.Read(newlineMask) // Fill slice with random bytes
for i, b := range newlineMask {
if b == 0 && i != 0 { // 1/256 chance, but never first byte
content[i] = byte('\n')
}
}

fullFile, err := ioutil.TempFile(tempDir, "")
require.NoError(t, err)
defer fullFile.Close()

_, err = fullFile.Write(content)
require.NoError(t, err)

fff, err := operator.NewFingerprint(fullFile)
require.NoError(t, err)

partialFile, err := ioutil.TempFile(tempDir, "")
require.NoError(t, err)
defer partialFile.Close()

// Write the first byte before comparing, since empty files will never match
_, err = partialFile.Write(content[:1])
require.NoError(t, err)
content = content[1:]

// Write one byte at a time and validate that
// full fingerprint still starts with updated partial
for i := range content {
_, err = partialFile.Write(content[i:i])
require.NoError(t, err)

pff, err := operator.NewFingerprint(partialFile)
require.NoError(t, err)

require.True(t, fff.StartsWith(pff))
}
}

// TODO TestConfig (config_test.go) - sets defaults, errors appropriately, etc
5 changes: 5 additions & 0 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
}
}

// Close will close the file
func (f *Reader) Close() error {
return f.file.Close()
}

// Emit creates an entry with the decoded message and sends it to the next
// operator in the pipeline
func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
Expand Down

0 comments on commit ff98aa9

Please sign in to comment.