Skip to content

Commit

Permalink
adding max_time to ordering criteria mtime
Browse files Browse the repository at this point in the history
  • Loading branch information
lokesh.balla committed Apr 14, 2024
1 parent ad71436 commit 00a15b1
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 73 deletions.
18 changes: 15 additions & 3 deletions pkg/stanza/fileconsumer/matcher/internal/filter/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin
)
}

type mtimeSortOption struct{}
type mtimeSortOption struct {
hasLimit bool
maxTime time.Duration
}

type mtimeItem struct {
mtime time.Time
Expand All @@ -151,6 +154,12 @@ func (m mtimeSortOption) apply(items []*item) ([]*item, error) {
continue
}

// drop all files that are older than max time
modTime := fi.ModTime()
if m.hasLimit && time.Since(modTime) >= m.maxTime {
continue
}

mtimeItems = append(mtimeItems, mtimeItem{
mtime: fi.ModTime(),
path: path,
Expand All @@ -171,6 +180,9 @@ func (m mtimeSortOption) apply(items []*item) ([]*item, error) {
return filteredValues, errs
}

func SortMtime() Option {
return mtimeSortOption{}
func SortMtime(duration time.Duration) Option {
return mtimeSortOption{
hasLimit: !(duration.Seconds() == 0),
maxTime: duration,
}
}
82 changes: 81 additions & 1 deletion pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,87 @@ func TestMTimeFilter(t *testing.T) {
items = append(items, it)
}

f := SortMtime()
f := SortMtime(0)
result, err := f.apply(items)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}

relativeResult := []string{}
for _, r := range result {
rel, err := filepath.Rel(tmpDir, r.value)
require.NoError(t, err)
relativeResult = append(relativeResult, rel)
}

require.Equal(t, tc.expect, relativeResult)
})
}
}

func TestMTimeFilterWithLimit(t *testing.T) {
now := time.Now()
cases := []struct {
name string
files []string
fileMTimes []time.Time
expectedErr string
expect []string
}{
{
name: "No files",
files: []string{},
fileMTimes: []time.Time{},
expect: []string{},
},
{
name: "Single file",
files: []string{"a.log"},
fileMTimes: []time.Time{now},
expect: []string{"a.log"},
},
{
name: "single file among Multiple files",
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{now, now.Add(-time.Hour)},
expect: []string{"a.log"},
},
{
name: "Multiple files in less than 10 sec",
files: []string{"a.log", "b.log", "c.log"},
fileMTimes: []time.Time{now.Add(-time.Second), now.Add(-time.Second * 2), now},
expect: []string{"c.log", "a.log", "b.log"},
},
{
name: "Multiple files with few less than 10 sec",
files: []string{"a.log", "b.log", "c.log", "d.log", "e.log"},
fileMTimes: []time.Time{now.Add(-time.Second), now.Add(-time.Second * 2), now, now.Add(-time.Minute), now.Add(-time.Hour)},
expect: []string{"c.log", "a.log", "b.log"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tmpDir := t.TempDir()
items := []*item{}
// Create files with specified mtime
for i, file := range tc.files {
mtime := tc.fileMTimes[i]
fullPath := filepath.Join(tmpDir, file)

f, err := os.Create(fullPath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Chtimes(fullPath, now, mtime))

it, err := newItem(fullPath, nil)
require.NoError(t, err)

items = append(items, it)
}

f := SortMtime(time.Minute)
result, err := f.apply(items)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
Expand Down
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"

Expand Down Expand Up @@ -52,6 +53,9 @@ type Sort struct {
// Timestamp only
Layout string `mapstructure:"layout,omitempty"`
Location string `mapstructure:"location,omitempty"`

// mtime only
MaxTime time.Duration `mapstructure:"max_time,omitempty"`
}

func New(c Criteria) (*Matcher, error) {
Expand Down Expand Up @@ -119,7 +123,7 @@ func New(c Criteria) (*Matcher, error) {
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
filterOpts = append(filterOpts, filter.SortMtime())
filterOpts = append(filterOpts, filter.SortMtime(sc.MaxTime))
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
Expand Down
104 changes: 52 additions & 52 deletions pkg/stanza/operator/parser/syslog/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {

ts := time.Now()

var cases = []Case{
cases := []Case{
{
"RFC3164SkipPriAbsent",
func() *Config {
Expand Down Expand Up @@ -125,12 +125,12 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), 0, location["utc"]),
Severity: entry.Error2,
SeverityText: "crit",
Attributes: map[string]interface{}{
"syslog_appname": "apache_server",
"syslog_facility": 4,
"host": "1.2.3.4",
"message": "test message",
"syslog_priority": 34,
Attributes: map[string]any{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
Body: fmt.Sprintf("<34>%s 1.2.3.4 apache_server: test message", ts.Format("Jan _2 15:04:05")),
},
Expand All @@ -152,12 +152,12 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), 0, location["detroit"]),
Severity: entry.Error2,
SeverityText: "crit",
Attributes: map[string]interface{}{
"syslog_appname": "apache_server",
"syslog_facility": 4,
"host": "1.2.3.4",
"message": "test message",
"syslog_priority": 34,
Attributes: map[string]any{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
Body: fmt.Sprintf("<34>%s 1.2.3.4 apache_server: test message", ts.Format("Jan _2 15:04:05")),
},
Expand All @@ -179,12 +179,12 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), 0, location["athens"]),
Severity: entry.Error2,
SeverityText: "crit",
Attributes: map[string]interface{}{
"syslog_appname": "apache_server",
"syslog_facility": 4,
"host": "1.2.3.4",
"message": "test message",
"syslog_priority": 34,
Attributes: map[string]any{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
Body: fmt.Sprintf("<34>%s 1.2.3.4 apache_server: test message", ts.Format("Jan _2 15:04:05")),
},
Expand All @@ -205,23 +205,23 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(2015, 8, 5, 21, 58, 59, 693000000, time.UTC),
Severity: entry.Info,
SeverityText: "info",
Attributes: map[string]interface{}{
"syslog_appname": "SecureAuth0",
"syslog_facility": 10,
"host": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"syslog_msg_id": "ID52020",
"syslog_priority": 86,
"syslog_proc_id": "23108",
"syslog_structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
Attributes: map[string]any{
"appname": "SecureAuth0",
"facility": 10,
"hostname": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]any{
"SecureAuth@27389": map[string]any{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
"UserID": "Tester2",
},
},
"syslog_version": 1,
"version": 1,
},
Body: `<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserHostAddress="192.168.2.132" Realm="SecureAuth0" UserID="Tester2" PEN="27389"] Found the user for retrieving user's profile`,
},
Expand Down Expand Up @@ -317,23 +317,23 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(2015, 8, 5, 21, 58, 59, 693000000, time.UTC),
Severity: entry.Info,
SeverityText: "info",
Attributes: map[string]interface{}{
"syslog_appname": "SecureAuth0",
"syslog_facility": 10,
"host": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"syslog_msg_id": "ID52020",
"syslog_priority": 86,
"syslog_proc_id": "23108",
"syslog_structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
Attributes: map[string]any{
"appname": "SecureAuth0",
"facility": 10,
"hostname": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]any{
"SecureAuth@27389": map[string]any{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
"UserID": "Tester2",
},
},
"syslog_version": 1,
"version": 1,
},
Body: `215 <86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserHostAddress="192.168.2.132" Realm="SecureAuth0" UserID="Tester2" PEN="27389"] Found the user for retrieving user's profile`,
},
Expand All @@ -355,23 +355,23 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
Timestamp: time.Date(2015, 8, 5, 21, 58, 59, 693000000, time.UTC),
Severity: entry.Info,
SeverityText: "info",
Attributes: map[string]interface{}{
"syslog_appname": "SecureAuth0",
"syslog_facility": 10,
"host": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"syslog_msg_id": "ID52020",
"syslog_priority": 86,
"syslog_proc_id": "23108",
"syslog_structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
Attributes: map[string]any{
"appname": "SecureAuth0",
"facility": 10,
"hostname": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]any{
"SecureAuth@27389": map[string]any{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
"UserID": "Tester2",
},
},
"syslog_version": 1,
"version": 1,
},
Body: nonTransparentBody,
},
Expand Down
32 changes: 16 additions & 16 deletions pkg/stanza/operator/parser/syslog/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ func (p *Parser) shouldSkipPriorityValues(value []byte) bool {
// parseRFC3164 will parse an RFC3164 syslog message.
func (p *Parser) parseRFC3164(syslogMessage *rfc3164.SyslogMessage, skipPriHeaderValues bool) (map[string]any, error) {
value := map[string]any{
"timestamp": syslogMessage.Timestamp,
"host": syslogMessage.Hostname,
"syslog_appname": syslogMessage.Appname,
"syslog_proc_id": syslogMessage.ProcID,
"syslog_msg_id": syslogMessage.MsgID,
"message": syslogMessage.Message,
"timestamp": syslogMessage.Timestamp,
"hostname": syslogMessage.Hostname,
"appname": syslogMessage.Appname,
"proc_id": syslogMessage.ProcID,
"msg_id": syslogMessage.MsgID,
"message": syslogMessage.Message,
}

if !skipPriHeaderValues {
Expand All @@ -151,14 +151,14 @@ func (p *Parser) parseRFC3164(syslogMessage *rfc3164.SyslogMessage, skipPriHeade
// parseRFC5424 will parse an RFC5424 syslog message.
func (p *Parser) parseRFC5424(syslogMessage *rfc5424.SyslogMessage, skipPriHeaderValues bool) (map[string]any, error) {
value := map[string]any{
"timestamp": syslogMessage.Timestamp,
"host": syslogMessage.Hostname,
"syslog_appname": syslogMessage.Appname,
"syslog_proc_id": syslogMessage.ProcID,
"syslog_msg_id": syslogMessage.MsgID,
"message": syslogMessage.Message,
"syslog_structured_data": syslogMessage.StructuredData,
"syslog_version": syslogMessage.Version,
"timestamp": syslogMessage.Timestamp,
"hostname": syslogMessage.Hostname,
"appname": syslogMessage.Appname,
"proc_id": syslogMessage.ProcID,
"msg_id": syslogMessage.MsgID,
"message": syslogMessage.Message,
"structured_data": syslogMessage.StructuredData,
"version": syslogMessage.Version,
}

if !skipPriHeaderValues {
Expand Down Expand Up @@ -255,7 +255,7 @@ var severityText = [...]string{
7: "debug",
}

var severityField = entry.NewAttributeField("syslog_severity")
var severityField = entry.NewAttributeField("severity")

func cleanupTimestamp(e *entry.Entry) error {
_, ok := entry.NewAttributeField("timestamp").Delete(e)
Expand Down Expand Up @@ -316,4 +316,4 @@ func newNonTransparentFramingParseFunc(trailerType nontransparent.TrailerType) p
parser.Parse(reader)
return
}
}
}

0 comments on commit 00a15b1

Please sign in to comment.