Skip to content

Commit

Permalink
Move O365audit input to v2 input API (#19719)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 8, 2020
1 parent aa60a58 commit 03bedd7
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 460 deletions.
1 change: 0 additions & 1 deletion x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
)

func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
Expand All @@ -20,5 +21,7 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin
}

func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
return []v2.Plugin{}
return []v2.Plugin{
o365audit.Plugin(log, store),
}
}
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/o365audit/contentblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type contentBlob struct {
env apiEnvironment
id, url string
// cursor is used to ACK the resulting events.
cursor cursor
cursor checkpoint
// skipLines is used when resuming from a saved cursor so that already
// acknowledged objects are not duplicated.
skipLines int
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c contentBlob) handleError(response *http.Response) (actions []poll.Action
}

// ContentBlob creates a new contentBlob.
func ContentBlob(url string, cursor cursor, env apiEnvironment) contentBlob {
func ContentBlob(url string, cursor checkpoint, env apiEnvironment) contentBlob {
return contentBlob{
url: url,
env: env,
Expand Down
37 changes: 22 additions & 15 deletions x-pack/filebeat/input/o365audit/contentblob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -21,9 +22,15 @@ type contentStore struct {
stopped bool
}

func (s *contentStore) onEvent(b beat.Event) bool {
var errStopped = errors.New("stopped")

func (s *contentStore) onEvent(b beat.Event, checkpointUpdate interface{}) error {
b.Private = checkpointUpdate
s.events = append(s.events, b)
return !s.stopped
if s.stopped {
return errStopped
}
return nil
}

func (f *fakePoll) BlobContent(t testing.TB, b poll.Transaction, data []common.MapStr, nextUrl string) poll.Transaction {
Expand All @@ -41,7 +48,7 @@ func makeEvent(ts time.Time, id string) common.MapStr {
}
}

func validateBlobs(t testing.TB, store contentStore, expected []string, c cursor) cursor {
func validateBlobs(t testing.TB, store contentStore, expected []string, c checkpoint) checkpoint {
assert.Len(t, store.events, len(expected))
for idx := range expected {
id, err := getString(store.events[idx].Fields, fieldsPrefix+".Id")
Expand All @@ -51,14 +58,14 @@ func validateBlobs(t testing.TB, store contentStore, expected []string, c cursor
assert.Equal(t, expected[idx], id)
}
prev := c
baseLine := c.line
baseLine := c.Line
for idx, id := range expected {
ev := store.events[idx]
cursor, ok := ev.Private.(cursor)
cursor, ok := ev.Private.(checkpoint)
if !assert.True(t, ok) {
t.Fatal("no cursor for event id", id)
}
assert.Equal(t, idx+1+baseLine, cursor.line)
assert.Equal(t, idx+1+baseLine, cursor.Line)
assert.True(t, prev.Before(cursor))
prev = cursor
}
Expand All @@ -72,7 +79,7 @@ func TestContentBlob(t *testing.T) {
Logger: logp.L(),
Callback: store.onEvent,
}
baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now())
baseCursor := checkpoint{Timestamp: time.Now()}
query := ContentBlob("http://test.localhost/", baseCursor, ctx)
data := []common.MapStr{
makeEvent(now.Add(-time.Hour), "e1"),
Expand All @@ -85,17 +92,17 @@ func TestContentBlob(t *testing.T) {
next := f.BlobContent(t, query, data, "")
assert.Nil(t, next)
c := validateBlobs(t, store, expected, baseCursor)
assert.Equal(t, len(expected), c.line)
assert.Equal(t, len(expected), c.Line)
}

func TestContentBlobResumeToLine(t *testing.T) {
var f fakePoll
var store contentStore
ctx := testConfig()
ctx.Callback = store.onEvent
baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now())
baseCursor := checkpoint{Timestamp: time.Now()}
const skip = 3
baseCursor.line = skip
baseCursor.Line = skip
query := ContentBlob("http://test.localhost/", baseCursor, ctx).WithSkipLines(skip)
data := []common.MapStr{
makeEvent(now.Add(-time.Hour), "e1"),
Expand All @@ -108,7 +115,7 @@ func TestContentBlobResumeToLine(t *testing.T) {
next := f.BlobContent(t, query, data, "")
assert.Nil(t, next)
c := validateBlobs(t, store, expected, baseCursor)
assert.Equal(t, len(expected), c.line-skip)
assert.Equal(t, len(expected), c.Line-skip)
}

func TestContentBlobPaged(t *testing.T) {
Expand All @@ -118,7 +125,7 @@ func TestContentBlobPaged(t *testing.T) {
Logger: logp.L(),
Callback: store.onEvent,
}
baseCursor := newCursor(stream{"myTenant", "contentype"}, time.Now())
baseCursor := checkpoint{Timestamp: time.Now()}
query := ContentBlob("http://test.localhost/", baseCursor, ctx)
data := []common.MapStr{
makeEvent(now.Add(-time.Hour), "e1"),
Expand All @@ -133,17 +140,17 @@ func TestContentBlobPaged(t *testing.T) {
assert.NotNil(t, next)
assert.IsType(t, paginator{}, next)
c := validateBlobs(t, store, expected, baseCursor)
assert.Equal(t, 3, c.line)
assert.Equal(t, 3, c.Line)
store.events = nil
next = f.BlobContent(t, next, data[3:5], "http://test.localhost/page/3")
assert.IsType(t, paginator{}, next)
expected = []string{"e4", "e5"}
c = validateBlobs(t, store, expected, c)
assert.Equal(t, 5, c.line)
assert.Equal(t, 5, c.Line)
store.events = nil
next = f.BlobContent(t, next, data[5:], "")
assert.Nil(t, next)
expected = []string{"e6"}
c = validateBlobs(t, store, expected, c)
assert.Equal(t, 6, c.line)
assert.Equal(t, 6, c.Line)
}
19 changes: 0 additions & 19 deletions x-pack/filebeat/input/o365audit/dates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package o365audit

import (
"fmt"
"sort"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -79,23 +77,6 @@ func getDateKey(m common.MapStr, key string, formats dateFormats) (t time.Time,
return formats.Parse(str)
}

// Sort a slice of maps by one of its keys parsed as a date in the given format(s).
func sortMapSliceByDate(s []common.MapStr, dateKey string, formats dateFormats) error {
var errs multierror.Errors
sort.Slice(s, func(i, j int) bool {
di, e1 := getDateKey(s[i], dateKey, formats)
dj, e2 := getDateKey(s[j], dateKey, formats)
if e1 != nil {
errs = append(errs, e1)
}
if e2 != nil {
errs = append(errs, e2)
}
return di.Before(dj)
})
return errors.Wrapf(errs.Err(), "failed sorting by date key:%s", dateKey)
}

func inRange(d, maxLimit time.Duration) bool {
if maxLimit < 0 {
maxLimit = -maxLimit
Expand Down
Loading

0 comments on commit 03bedd7

Please sign in to comment.