diff --git a/code/sonalyze/analysis.go b/code/sonalyze/analysis.go index e8040bb6..1abf84e0 100644 --- a/code/sonalyze/analysis.go +++ b/code/sonalyze/analysis.go @@ -5,6 +5,7 @@ package main import ( "fmt" "io" + "math" "os" "go-utils/config" @@ -73,7 +74,7 @@ func buildRecordFilters( cmd SampleAnalysisCommand, cfg *config.ClusterConfig, verbose bool, -) (*hostglob.HostGlobber, db.SampleFilter, error) { +) (*hostglob.HostGlobber, *db.SampleFilter, error) { args := cmd.SharedFlags() // Temporary limitation. @@ -192,19 +193,30 @@ func buildRecordFilters( excludeSystemJobs := args.RecordFilterArgs.ExcludeSystemJobs haveFrom := args.SourceArgs.HaveFrom haveTo := args.SourceArgs.HaveTo - from := args.SourceArgs.FromDate.Unix() - to := args.SourceArgs.ToDate.Unix() - recordFilter := func(e *db.Sample) bool { - return (len(includeUsers) == 0 || includeUsers[e.User]) && - (includeHosts.IsEmpty() || includeHosts.Match(e.Host.String())) && - (len(includeJobs) == 0 || includeJobs[e.Job]) && - (len(includeCommands) == 0 || includeCommands[e.Cmd]) && - !excludeUsers[e.User] && - !excludeJobs[e.Job] && - !excludeCommands[e.Cmd] && - (!excludeSystemJobs || e.Pid >= 1000) && - (!haveFrom || from <= e.Timestamp) && - (!haveTo || e.Timestamp <= to) + var from int64 = 0 + if haveFrom { + from = args.SourceArgs.FromDate.Unix() + } + var to int64 = math.MaxInt64 + if haveTo { + to = args.SourceArgs.ToDate.Unix() + } + var minPid uint32 + if excludeSystemJobs { + minPid = 1000 + } + + var recordFilter = &db.SampleFilter{ + IncludeUsers: includeUsers, + IncludeHosts: includeHosts, + IncludeJobs: includeJobs, + IncludeCommands: includeCommands, + ExcludeUsers: excludeUsers, + ExcludeJobs: excludeJobs, + ExcludeCommands: excludeCommands, + MinPid: minPid, + From: from, + To: to, } if verbose { @@ -248,5 +260,5 @@ func buildRecordFilters( } } - return includeHosts, (db.SampleFilter)(recordFilter), nil + return includeHosts, recordFilter, nil } diff --git a/code/sonalyze/command/command.go b/code/sonalyze/command/command.go index e555ac0e..4ce30c29 100644 --- a/code/sonalyze/command/command.go +++ b/code/sonalyze/command/command.go @@ -90,7 +90,7 @@ type SampleAnalysisCommand interface { streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, hostGlobber *hostglob.HostGlobber, - recordFilter db.SampleFilter, + recordFilter *db.SampleFilter, ) error // Retrieve configfile for those commands that allow it, otherwise "", or "" for absent diff --git a/code/sonalyze/db/samples.go b/code/sonalyze/db/samples.go index c7ad1d4d..fe34d819 100644 --- a/code/sonalyze/db/samples.go +++ b/code/sonalyze/db/samples.go @@ -10,40 +10,11 @@ import ( "slices" "time" + "go-utils/hostglob" + . "sonalyze/common" ) -// The db.SampleFilter will be applied to individual records and must return true for records to be -// included and false for all others. -// -// The db.SampleFilter is never nil. (Nil would be meaningful in some corner cases but generally it -// will almost never be nil in practical situations and remembering the extra nil check is just -// annoying.) -// -// HARD REQUIREMENT: The SampleFilter *must* be thread-safe and *should* be non-locking and -// non-contending; if it refers to shared resources then the resources should be -// shared-read-without-locking by all threads. The filter can be applied at any point in the -// ingestion pipeline. -// -// Notes: -// -// - Go maps are safe for concurrent read access without locking and can be used by the SampleFilter -// with that restriction. From https://go.dev/doc/faq#atomic_maps: -// -// Map access is unsafe only when updates are occurring. As long as all goroutines are only -// reading—looking up elements in the map, including iterating through it using a for range -// loop—and not changing the map by assigning to elements or doing deletions, it is safe for -// them to access the map concurrently without synchronization. -// -// Almost certainly, the program must be careful to establish a happens-before relationship -// between map initialization and all map reads for this to be true. Since the map will likely be -// broadcast to a bunch of goroutines this will often happen as a matter of course. -// -// - Sonalyze hostglobbers are thread-safe (but not always contention-free due to shared state in -// the regex engine, impact TBD.) - -type SampleFilter func (*Sample) bool - // Read a stream of Sonar data records, parse them and return them in order. Returns the number of // benign errors, and non-nil error if non-benign error. // @@ -556,3 +527,301 @@ LineLoop: err = nil return } + +// The db.SampleFilter will be applied to individual records and must be true for records to be +// included and false for all others. It may be applied at any point in the ingestion pipeline. +// +// The fields may all have zero values; however, if `To` is not a precise time then it should be set +// to math.MaxInt64, as zero will mean zero here too. +// +// Maps should only carry `true` values for present elements - these are sets. + +type SampleFilter struct { + IncludeUsers map[Ustr]bool + IncludeHosts *hostglob.HostGlobber + IncludeJobs map[uint32]bool + IncludeCommands map[Ustr]bool + ExcludeUsers map[Ustr]bool + ExcludeJobs map[uint32]bool + ExcludeCommands map[Ustr]bool + MinPid uint32 + From int64 + To int64 +} + +// Construct an efficient filter function from the sample filter. +// +// This is a convenience function for external code that has a SampleFilter but needs to have a +// function for filtering. If the DB layer were to do any filtering by itself it would be under no +// obligation to use this. +// +// The recordFilter or its parts *MUST NOT* be modified by the caller subsequently, as parts of it +// may be retained by the filter function and may be used on concurrent threads. For all practical +// purposes, the returned function takes (shared) ownership of the recordFilter. The returned +// function however does not update the recordFilter or its part. +// +// The returned function will be thread-safe and mostly non-locking and non-contending. +// +// Notes: +// +// - Go maps are safe for concurrent read access without locking and can be used by the SampleFilter +// with that restriction. From https://go.dev/doc/faq#atomic_maps: +// +// Map access is unsafe only when updates are occurring. As long as all goroutines are only +// reading—looking up elements in the map, including iterating through it using a for range +// loop—and not changing the map by assigning to elements or doing deletions, it is safe for +// them to access the map concurrently without synchronization. +// +// Almost certainly, the program must be careful to establish a happens-before relationship +// between map initialization and all map reads for this to be true. Since the map will likely be +// broadcast to a bunch of goroutines this will often happen as a matter of course. +// +// - The Sonalyze HostGlobber is thread-safe (but not always contention-free due to shared state in +// the regex engine, impact TBD.) +// +// The filter has a number of components, many of which are empty in most cases. Filtering can be a +// major expense, so performance is important. There are numerous options: +// +// - Simple conjunction of filters in their most straightforward state. +// +// - Ditto, but with fast-checkable flags for each case to avoid doing work, probably nil checks in +// the implementation will serve the same function as these flags in most cases though, so this +// would likely be very modest gain at best, over the previous case. +// +// - Special implementations for common or important cases, and a default (one of the above) for the +// rest, eg, filters that filter "only" on one job ID. This is surprisingly hard due to how the +// filters are used in practice to filter eg heartbeat records and from/to ranges: no filters +// actually filter just on one thing. +// +// - A list of closures or a nested closure, one closure for each attribute to filter by, avoiding +// code altogether for absent cases. This adds call overhead per case but may still be a win +// because many cases will not be tested for. +// +// - A bytecode representation of active filters so that there is no closure call overhead and no +// redundant tests, but we add dispatch overhead per case again. +// +// It is also beneficial to specialize for sets-of-size-one to avoid hashing and lookup overhead, +// this specialization is possible for the later implementations in the list. + +// This is the canonical code, preserved here for posterity. +/* +func InstantiateSampleFilter0(recordFilter *SampleFilter) func(*Sample) bool { + return func(e *Sample) bool { + return (len(recordFilter.IncludeUsers) == 0 || recordFilter.IncludeUsers[e.User]) && + (recordFilter.IncludeHosts == nil || + recordFilter.IncludeHosts.IsEmpty() || + recordFilter.IncludeHosts.Match(e.Host.String())) && + (len(recordFilter.IncludeJobs) == 0 || recordFilter.IncludeJobs[e.Job]) && + (len(recordFilter.IncludeCommands) == 0 || recordFilter.IncludeCommands[e.Cmd]) && + !recordFilter.ExcludeUsers[e.User] && + !recordFilter.ExcludeJobs[e.Job] && + !recordFilter.ExcludeCommands[e.Cmd] && + e.Pid >= recordFilter.MinPid && + recordFilter.From <= e.Timestamp && + e.Timestamp <= recordFilter.To + } +} +*/ + +// Simle bytecode interpreter. For typical filters this is much faster than the above code, some +// sample queries against two months of Fox data run in about half the time. + +func InstantiateSampleFilter(recordFilter *SampleFilter) func(*Sample) bool { + // The filter is a simple bytecode interpreter so as to avoid redundancies and easily specialize + // fast cases. Tests are ordered from most to least likely and most to least discriminating. + // + // All instructions are 64-bit ints, sometimes there is a payload. This structure allows us to + // use a ranged `for` loop to avoid bounds checking, program counters, and end-of-program + // testing. + // + // Tests that are almost always done - from/to filtering here - are outside the loop to avoid + // the pointless dispatch overhead. + // + // TODO: A further optimization here is to merge common pairs of operations to avoid dispatch + // overhead - for example, a typical filter is to include by Job ID and exclude by the + // "_heartbeat_" command name. This type of merging would only require one operand. + // Alternatively we could specialize pairs with two operands if both operands fit in the 59 bits + // available (basically always). Implementing this optimization amounts to running a peephole + // optimizer on the generated instructions. + + const ( + testIncludeSingleJob uint64 = iota + testIncludeJobs + testIncludeSingleUser + testIncludeUsers + testIncludeHosts + testIncludeSingleCommand + testIncludeCommands + testExcludeSingleUser + testExcludeUsers + testExcludeSingleJob + testExcludeJobs + testExcludeSingleCommand + testExcludeCommands + testExcludeLowPids + ) + + const ( + // Opcode in low 5 bits + opMask = 31 + + // Operand in high 32 bits, leaving 27 free bits in the middle + opShift = 32 + ) + + instr := make([]uint64, 0) + + switch len(recordFilter.IncludeJobs) { + case 0: + case 1: + var theJob uint32 + for j := range recordFilter.IncludeJobs { + theJob = j + } + instr = append(instr, testIncludeSingleJob|uint64(theJob)< 0 { + instr = append(instr, testExcludeLowPids|uint64(recordFilter.MinPid)<> opShift) + if e.Job != job { + return false + } + case testIncludeJobs: + if !recordFilter.IncludeJobs[e.Job] { + return false + } + case testIncludeSingleUser: + uid := Ustr(uint32(op >> opShift)) + if e.User != uid { + return false + } + case testIncludeUsers: + if !recordFilter.IncludeUsers[e.User] { + return false + } + case testIncludeHosts: + if !recordFilter.IncludeHosts.Match(e.Host.String()) { + return false + } + case testIncludeSingleCommand: + cmd := Ustr(uint32(op >> opShift)) + if e.Cmd != cmd { + return false + } + case testIncludeCommands: + if !recordFilter.IncludeCommands[e.Cmd] { + return false + } + case testExcludeSingleUser: + uid := Ustr(uint32(op >> opShift)) + if e.User == uid { + return false + } + case testExcludeUsers: + if recordFilter.ExcludeUsers[e.User] { + return false + } + case testExcludeSingleJob: + job := uint32(op >> opShift) + if e.Job == job { + return false + } + case testExcludeJobs: + if recordFilter.ExcludeJobs[e.Job] { + return false + } + case testExcludeSingleCommand: + cmd := Ustr(uint32(op >> opShift)) + if e.Cmd == cmd { + return false + } + case testExcludeCommands: + if recordFilter.ExcludeCommands[e.Cmd] { + return false + } + case testExcludeLowPids: + pid := uint32(op >> opShift) + if e.Pid < pid { + return false + } + } + } + + // For any selection of note, these will always have to be run, but they will almost always + // pass due to the structure of the database. So apply them only at the end. + return recordFilter.From <= e.Timestamp && e.Timestamp <= recordFilter.To + } +} diff --git a/code/sonalyze/jobs/perform.go b/code/sonalyze/jobs/perform.go index c8693857..5947821f 100644 --- a/code/sonalyze/jobs/perform.go +++ b/code/sonalyze/jobs/perform.go @@ -90,7 +90,7 @@ func (jc *JobsCommand) Perform( streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, hostGlobber *hostglob.HostGlobber, - _ db.SampleFilter, + _ *db.SampleFilter, ) error { if jc.Verbose { Log.Infof("Streams constructed by postprocessing: %d", len(streams)) diff --git a/code/sonalyze/load/perform.go b/code/sonalyze/load/perform.go index b16f8d16..344f2391 100644 --- a/code/sonalyze/load/perform.go +++ b/code/sonalyze/load/perform.go @@ -22,7 +22,7 @@ func (lc *LoadCommand) Perform( streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, hostGlobber *hostglob.HostGlobber, - _ db.SampleFilter, + _ *db.SampleFilter, ) error { fromIncl, toIncl := lc.InterpretFromToWithBounds(bounds) diff --git a/code/sonalyze/metadata/metadata.go b/code/sonalyze/metadata/metadata.go index f1e939a3..6a943351 100644 --- a/code/sonalyze/metadata/metadata.go +++ b/code/sonalyze/metadata/metadata.go @@ -124,7 +124,7 @@ func (mdc *MetadataCommand) Perform( streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, // for mdc.Bounds only hostGlobber *hostglob.HostGlobber, - _ db.SampleFilter, + _ *db.SampleFilter, ) error { if mdc.Times { fmt.Fprintf(out, "From: %s\n", mdc.FromDate.Format(time.RFC3339)) diff --git a/code/sonalyze/parse/parse.go b/code/sonalyze/parse/parse.go index da9ff841..48563c5e 100644 --- a/code/sonalyze/parse/parse.go +++ b/code/sonalyze/parse/parse.go @@ -99,7 +99,7 @@ func (pc *ParseCommand) Perform( streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, // for pc.MergeByJob only hostGlobber *hostglob.HostGlobber, - recordFilter db.SampleFilter, + recordFilter *db.SampleFilter, ) error { var mergedSamples []*sonarlog.SampleStream var samples sonarlog.SampleStream @@ -118,7 +118,7 @@ func (pc *ParseCommand) Perform( for _, records := range recordBlobs { mapped = append(mapped, uslices.FilterMap( records, - recordFilter, + db.InstantiateSampleFilter(recordFilter), func(r *db.Sample) sonarlog.Sample { return sonarlog.Sample{S: r} }, diff --git a/code/sonalyze/profile/perform.go b/code/sonalyze/profile/perform.go index 8a16e248..cd7502ec 100644 --- a/code/sonalyze/profile/perform.go +++ b/code/sonalyze/profile/perform.go @@ -25,7 +25,7 @@ func (pc *ProfileCommand) Perform( streams sonarlog.InputStreamSet, _ sonarlog.Timebounds, _ *hostglob.HostGlobber, - _ db.SampleFilter, + _ *db.SampleFilter, ) error { jobId := pc.Job[0] diff --git a/code/sonalyze/sonarlog/ingest.go b/code/sonalyze/sonarlog/ingest.go index 6b05f5f2..51747585 100644 --- a/code/sonalyze/sonarlog/ingest.go +++ b/code/sonalyze/sonarlog/ingest.go @@ -26,7 +26,7 @@ func ReadSampleStreamsAndMaybeBounds( c db.SampleCluster, fromDate, toDate time.Time, hostGlobber *hostglob.HostGlobber, - recordFilter db.SampleFilter, + recordFilter *db.SampleFilter, wantBounds bool, verbose bool, ) ( diff --git a/code/sonalyze/sonarlog/postprocess.go b/code/sonalyze/sonarlog/postprocess.go index ece7ed25..1a3ecb84 100644 --- a/code/sonalyze/sonarlog/postprocess.go +++ b/code/sonalyze/sonarlog/postprocess.go @@ -89,12 +89,14 @@ func standardSampleRectifier(xs []*db.Sample, cfg *config.ClusterConfig) []*db.S func createInputStreams( entryBlobs [][]*db.Sample, - recordFilter db.SampleFilter, + recordFilter *db.SampleFilter, wantBounds bool, ) (InputStreamSet, Timebounds) { streams := make(InputStreamSet) bounds := make(Timebounds) + filterFunc := db.InstantiateSampleFilter(recordFilter) + // Reconstruct the individual sample streams. Each job with job id 0 gets its own stream, these // must not be merged downstream (they get different stream IDs but the job IDs remain 0). // @@ -117,7 +119,7 @@ func createInputStreams( } } - if !recordFilter(e) { + if !filterFunc(e) { continue } diff --git a/code/sonalyze/sonarlog/postprocess_test.go b/code/sonalyze/sonarlog/postprocess_test.go index 4489711d..6a812adf 100644 --- a/code/sonalyze/sonarlog/postprocess_test.go +++ b/code/sonalyze/sonarlog/postprocess_test.go @@ -1,6 +1,7 @@ package sonarlog import ( + "math" "os" "reflect" "testing" @@ -83,11 +84,18 @@ func TestPostprocessLogCpuUtilPct(t *testing.T) { t.Fatalf("Expected 2 discarded, got %d", discarded) } - root := StringToUstr("root") - filter := func(r *db.Sample) bool { - return r.User != root - } - streams, _ := createInputStreams([][]*db.Sample{entries}, filter, false) + streams, _ := createInputStreams( + [][]*db.Sample{ + entries, + }, + &db.SampleFilter{ + ExcludeUsers: map[Ustr]bool{ + StringToUstr("root"): true, + }, + To: math.MaxInt64, + }, + false, + ) ComputePerSampleFields(streams) if len(streams) != 4 { diff --git a/code/sonalyze/uptime/perform.go b/code/sonalyze/uptime/perform.go index e162eea9..71f0fa44 100644 --- a/code/sonalyze/uptime/perform.go +++ b/code/sonalyze/uptime/perform.go @@ -92,7 +92,7 @@ func (uc *UptimeCommand) Perform( streams sonarlog.InputStreamSet, bounds sonarlog.Timebounds, hostGlobber *hostglob.HostGlobber, - _ db.SampleFilter, + _ *db.SampleFilter, ) error { samples := slices.CatenateP(maps.Values(streams)) if uc.Verbose {