Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For #526 - move filtering earlier #599

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions code/sonalyze/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func localAnalysis(cmd SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Wri
return err
}

hostGlobber, recordFilter, err := buildFilters(cmd, cfg, args.Verbose)
hostGlobber, recordFilter, err := buildRecordFilters(cmd, cfg, args.Verbose)
if err != nil {
return fmt.Errorf("Failed to create record filter\n%w", err)
}
Expand All @@ -47,6 +47,7 @@ func localAnalysis(cmd SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Wri
args.FromDate,
args.ToDate,
hostGlobber,
recordFilter,
args.Verbose,
)
if err != nil {
Expand All @@ -57,7 +58,7 @@ func localAnalysis(cmd SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Wri
UstrStats(stderr, false)
}

sonarlog.ComputeAndFilter(streams, recordFilter)
sonarlog.ComputePerSampleFields(streams)
err = cmd.Perform(stdout, cfg, theLog, streams, bounds, hostGlobber, recordFilter)

if err != nil {
Expand All @@ -67,11 +68,11 @@ func localAnalysis(cmd SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Wri
return nil
}

func buildFilters(
func buildRecordFilters(
cmd SampleAnalysisCommand,
cfg *config.ClusterConfig,
verbose bool,
) (*hostglob.HostGlobber, func(*sonarlog.Sample) bool, error) {
) (*hostglob.HostGlobber, db.SampleFilter, error) {
args := cmd.SharedFlags()

// Temporary limitation.
Expand Down Expand Up @@ -184,24 +185,25 @@ func buildFilters(
}
}

// Record filtering logic is the same for all commands.
// Record filtering logic is the same for all commands. The record filter can use only raw
// ingested data, it can be applied at any point in the pipeline. It *must* be thread-safe.

excludeSystemJobs := args.RecordFilterArgs.ExcludeSystemJobs
haveFrom := args.SourceArgs.HaveFrom
haveTo := args.SourceArgs.HaveTo
from := args.SourceArgs.FromDate.Unix()
to := args.SourceArgs.ToDate.Unix()
recordFilter := func(e *sonarlog.Sample) bool {
return (len(includeUsers) == 0 || includeUsers[e.S.User]) &&
(includeHosts.IsEmpty() || includeHosts.Match(e.S.Host.String())) &&
(len(includeJobs) == 0 || includeJobs[e.S.Job]) &&
(len(includeCommands) == 0 || includeCommands[e.S.Cmd]) &&
!excludeUsers[e.S.User] &&
!excludeJobs[e.S.Job] &&
!excludeCommands[e.S.Cmd] &&
(!excludeSystemJobs || e.S.Pid >= 1000) &&
(!haveFrom || from <= e.S.Timestamp) &&
(!haveTo || e.S.Timestamp <= to)
recordFilter := func(e *db.Sample) bool {
return (len(includeUsers) == 0 || includeUsers[e.User]) &&
(includeHosts.IsEmpty() || includeHosts.Match(e.Host.String())) &&
(len(includeJobs) == 0 || includeJobs[e.Job]) &&
(len(includeCommands) == 0 || includeCommands[e.Cmd]) &&
!excludeUsers[e.User] &&
!excludeJobs[e.Job] &&
!excludeCommands[e.Cmd] &&
(!excludeSystemJobs || e.Pid >= 1000) &&
(!haveFrom || from <= e.Timestamp) &&
(!haveTo || e.Timestamp <= to)
}

if verbose {
Expand Down Expand Up @@ -245,5 +247,5 @@ func buildFilters(
}
}

return includeHosts, recordFilter, nil
return includeHosts, (db.SampleFilter)(recordFilter), nil
}
2 changes: 1 addition & 1 deletion code/sonalyze/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type SampleAnalysisCommand interface {
streams sonarlog.InputStreamSet,
bounds sonarlog.Timebounds,
hostGlobber *hostglob.HostGlobber,
recordFilter func(*sonarlog.Sample) bool,
recordFilter db.SampleFilter,
) error

// Retrieve configfile for those commands that allow it, otherwise "", or "" for absent
Expand Down
31 changes: 31 additions & 0 deletions code/sonalyze/db/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@ import (
. "sonalyze/common"
)

// The db.SampleFilter will be applied to individual records and must return true for records to be
// included and false for all others.
//
// The db.SampleFilter is never nil. (Nil would be meaningful in some corner cases but generally it
// will almost never be nil in practical situations and remembering the extra nil check is just
// annoying.)
//
// HARD REQUIREMENT: The SampleFilter *must* be thread-safe and *should* be non-locking and
// non-contending; if it refers to shared resources then the resources should be
// shared-read-without-locking by all threads. The filter can be applied at any point in the
// ingestion pipeline.
//
// Notes:
//
// - Go maps are safe for concurrent read access without locking and can be used by the SampleFilter
// with that restriction. From https://go.dev/doc/faq#atomic_maps:
//
// Map access is unsafe only when updates are occurring. As long as all goroutines are only
// reading—looking up elements in the map, including iterating through it using a for range
// loop—and not changing the map by assigning to elements or doing deletions, it is safe for
// them to access the map concurrently without synchronization.
//
// Almost certainly, the program must be careful to establish a happens-before relationship
// between map initialization and all map reads for this to be true. Since the map will likely be
// broadcast to a bunch of goroutines this will often happen as a matter of course.
//
// - Sonalyze hostglobbers are thread-safe (but not always contention-free due to shared state in
// the regex engine, impact TBD.)

type SampleFilter func (*Sample) bool

// Read a stream of Sonar data records, parse them and return them in order. Returns the number of
// benign errors, and non-nil error if non-benign error.
//
Expand Down
2 changes: 2 additions & 0 deletions code/sonalyze/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ type JobsCommand struct /* implements SampleAnalysisCommand */ {
minRuntimeStr string
}

var _ SampleAnalysisCommand = (*JobsCommand)(nil)

func (_ *JobsCommand) Summary() []string {
return []string{
"Select jobs by various criteria and present aggregate information",
Expand Down
2 changes: 1 addition & 1 deletion code/sonalyze/jobs/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (jc *JobsCommand) Perform(
streams sonarlog.InputStreamSet,
bounds sonarlog.Timebounds,
hostGlobber *hostglob.HostGlobber,
_ func(*sonarlog.Sample) bool,
_ db.SampleFilter,
) error {
if jc.Verbose {
Log.Infof("Streams constructed by postprocessing: %d", len(streams))
Expand Down
2 changes: 2 additions & 0 deletions code/sonalyze/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type LoadCommand struct /* implements SampleAnalysisCommand */ {
printOpts *FormatOptions
}

var _ SampleAnalysisCommand = (*LoadCommand)(nil)

func (_ *LoadCommand) Summary() []string {
return []string{
"Compute aggregate system load across various timeframes based on sample",
Expand Down
2 changes: 1 addition & 1 deletion code/sonalyze/load/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (lc *LoadCommand) Perform(
streams sonarlog.InputStreamSet,
bounds sonarlog.Timebounds,
hostGlobber *hostglob.HostGlobber,
_ func(*sonarlog.Sample) bool,
_ db.SampleFilter,
) error {
fromIncl, toIncl := lc.InterpretFromToWithBounds(bounds)

Expand Down
4 changes: 3 additions & 1 deletion code/sonalyze/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type MetadataCommand struct /* implements SampleAnalysisCommand */ {
printOpts *FormatOptions
}

var _ SampleAnalysisCommand = (*MetadataCommand)(nil)

func (_ *MetadataCommand) Summary() []string {
return []string{
"Display metadata about the sample streams in the database.",
Expand Down Expand Up @@ -118,7 +120,7 @@ func (mdc *MetadataCommand) Perform(
streams sonarlog.InputStreamSet,
bounds sonarlog.Timebounds,
hostGlobber *hostglob.HostGlobber,
_ func(*sonarlog.Sample) bool,
_ db.SampleFilter,
) error {
if mdc.Times {
fmt.Fprintf(out, "From: %s\n", mdc.FromDate.Format(time.RFC3339))
Expand Down
19 changes: 8 additions & 11 deletions code/sonalyze/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ParseCommand struct /* implements SampleAnalysisCommand */ {
printOpts *FormatOptions
}

var _ SampleAnalysisCommand = (*ParseCommand)(nil)

func (_ *ParseCommand) Summary() []string {
return []string{
"Export sample data in various formats, after optional preprocessing.",
Expand Down Expand Up @@ -93,7 +95,7 @@ func (pc *ParseCommand) Perform(
streams sonarlog.InputStreamSet,
bounds sonarlog.Timebounds,
hostGlobber *hostglob.HostGlobber,
recordFilter func(*sonarlog.Sample) bool,
recordFilter db.SampleFilter,
) error {
var mergedSamples []*sonarlog.SampleStream
var samples sonarlog.SampleStream
Expand All @@ -106,20 +108,15 @@ func (pc *ParseCommand) Perform(
if err != nil {
return err
}
// Simulate the normal pipeline
mapped := slices.Map(

// Simulate the normal pipeline, the recordFilter application is expected by the user.
samples = sonarlog.SampleStream(slices.FilterMap(
records,
recordFilter,
func(r *db.Sample) sonarlog.Sample {
return sonarlog.Sample{S: r}
},
)
// We still need to filter the records or things will be very confusing
if recordFilter != nil {
mapped = slices.Filter(mapped, func (s sonarlog.Sample) bool {
return recordFilter(&s)
})
}
samples = sonarlog.SampleStream(mapped)
))

case pc.Clean:
mergedSamples = maps.Values(streams)
Expand Down
2 changes: 1 addition & 1 deletion code/sonalyze/profile/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (pc *ProfileCommand) Perform(
streams sonarlog.InputStreamSet,
_ sonarlog.Timebounds,
_ *hostglob.HostGlobber,
_ func(*sonarlog.Sample) bool,
_ db.SampleFilter,
) error {
jobId := pc.Job[0]

Expand Down
2 changes: 2 additions & 0 deletions code/sonalyze/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ProfileCommand struct /* implements SampleAnalysisCommand */ {
testNoMemory bool
}

var _ SampleAnalysisCommand = (*ProfileCommand)(nil)

func (_ *ProfileCommand) Summary() []string {
return []string{
"Print profile information for one aspect of a particular job.",
Expand Down
3 changes: 2 additions & 1 deletion code/sonalyze/sonarlog/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func ReadSampleStreams(
c db.SampleCluster,
fromDate, toDate time.Time,
hostGlobber *hostglob.HostGlobber,
recordFilter db.SampleFilter,
verbose bool,
) (
streams InputStreamSet,
Expand All @@ -38,7 +39,7 @@ func ReadSampleStreams(
return
}
read = len(samples)
streams, bounds = createInputStreams(samples)
streams, bounds = createInputStreams(samples, recordFilter)
return
}

Expand Down
Loading