From 2a5b247ab3a0599950095d6951b2c9d78ba0e7f2 Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Mon, 16 Sep 2024 14:02:43 +0200 Subject: [PATCH] For #526 - Change DB APIs to return slice-of-slice with shared data --- code/sonalyze/db/clusterstore.go | 45 ++++++++---- code/sonalyze/db/clusterstore_test.go | 81 ++++++++++++++++------ code/sonalyze/db/persistentcluster.go | 12 ++-- code/sonalyze/db/sacctfile.go | 2 +- code/sonalyze/db/samplefile.go | 4 +- code/sonalyze/db/sysinfofile.go | 2 +- code/sonalyze/db/transientcluster.go | 6 +- code/sonalyze/parse/parse.go | 22 +++--- code/sonalyze/sacct/perform.go | 5 +- code/sonalyze/sonarlog/ingest.go | 16 +++-- code/sonalyze/sonarlog/postprocess.go | 80 +++++++++++---------- code/sonalyze/sonarlog/postprocess_test.go | 16 +++-- 12 files changed, 180 insertions(+), 111 deletions(-) diff --git a/code/sonalyze/db/clusterstore.go b/code/sonalyze/db/clusterstore.go index 0381a8ce..ea49e90f 100644 --- a/code/sonalyze/db/clusterstore.go +++ b/code/sonalyze/db/clusterstore.go @@ -105,20 +105,22 @@ type SampleCluster interface { ) ([]string, error) // Read `ps` samples from all the files selected by SampleFilenames() and extract the - // per-process sample data. Times must be UTC. + // per-process sample data. Times must be UTC. The inner slices of the result, and the + // records they point to, must not be mutated in any way. ReadSamples( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, - ) (samples []*Sample, dropped int, err error) + ) (sampleBlobs [][]*Sample, dropped int, err error) // Read `ps` samples from all the files selected by SampleFilenames() and extract the load data. - // Times must be UTC. + // Times must be UTC. The inner slices of the result, and the records they point to, must not + // be mutated in any way. ReadLoadData( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, - ) (data []*LoadDatum, dropped int, err error) + ) (dataBlobs [][]*LoadDatum, dropped int, err error) } // A SysinfoCluster can provide `sonar sysinfo` data: per-system hardware configuration data. @@ -133,11 +135,13 @@ type SysinfoCluster interface { ) ([]string, error) // Read `sysinfo` records from all the files selected by SysinfoFilenames(). Times must be UTC. + // The inner slices of the result, and the records they point to, must not be mutated in any + // way. ReadSysinfo( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, - ) (records []*config.NodeConfigRecord, dropped int, err error) + ) (sysinfoBlobs [][]*config.NodeConfigRecord, dropped int, err error) } // There is no HostGlobber here, as the sacct data are not mostly node-oriented. Any analysis @@ -152,10 +156,12 @@ type SacctCluster interface { ) ([]string, error) // Read `sacct` records from all the files selected by SacctFilenames(). Times must be UTC. + // The inner slices of the result, and the records they point to, must not be mutated in any + // way. ReadSacctData( fromDate, toDate time.Time, verbose bool, - ) (records []*SacctInfo, dropped int, err error) + ) (recordBlobs [][]*SacctInfo, dropped int, err error) } // An AppendableCluster (not yet well developed, this could be split into appending different types @@ -389,9 +395,16 @@ func init() { } } -// Read a set of records from a set of files and return the resulting list, which may be in any -// order (b/c concurrency) but will always be freshly allocated (pointing to shared, immutable data -// objects). We do this by passing read request to the pool of readers and collecting the results. +// Read a set of records from a set of files and return a slice of slices, normally one inner slice +// per file. The outer slice is always freshly allocated but the inner slices, though immutable, +// are owned by the database layer, as is their underlying storage. The objects pointed to from the +// inner slices are also shared and immutable. The inner slices may be in any order due to +// concurrency in the database access layer (reading is implemented by by passing the read request +// to the pool of readers and collecting the results). +// +// To be safe, clients should iterate over the data structure as soon as they can and not retain +// references to the slices longer than necessary. The inner slices *MUST* not be mutated; in +// particular, they must not be sorted. // // On return, `dropped` is an indication of the number of benign errors, but it conflates dropped // records and dropped fields. err is non-nil only for non-benign records, in which case it @@ -399,12 +412,16 @@ func init() { // // TODO: IMPROVEME: The API is a little crusty. We could distinguish dropped fields vs dropped // records, and we could sensibly return partial results too. +// +// TODO: We could strengthen the notion of immutability of the results by wrapping the result set in +// a typesafe container that can be iterated over, esp in Go 1.23 or later. But even defining a +// type a la ResultSet[T] and returning that would help, probably. func readRecordsFromFiles[T any]( files []*LogFile, verbose bool, reader ReadSyncMethods, -) (records []*T, dropped int, err error) { +) (recordBlobs [][]*T, dropped int, err error) { if verbose { Log.Infof("%d files", len(files)) } @@ -414,7 +431,7 @@ func readRecordsFromFiles[T any]( // about 32e6 records). results := make(chan parseResult, 100) - records = make([]*T, 0) + recordBlobs = make([][]*T, 0) toReceive := len(files) toSend := 0 @@ -434,7 +451,7 @@ func readRecordsFromFiles[T any]( if res.err != nil { bad += " " + res.err.Error() + "\n" } else { - records = append(records, res.data.([]*T)...) + recordBlobs = append(recordBlobs, res.data.([]*T)) dropped += res.dropped } toReceive-- @@ -445,14 +462,14 @@ func readRecordsFromFiles[T any]( if res.err != nil { bad += " " + res.err.Error() + "\n" } else { - records = append(records, res.data.([]*T)...) + recordBlobs = append(recordBlobs, res.data.([]*T)) dropped += res.dropped } toReceive-- } if bad != "" { - records = nil + recordBlobs = nil err = fmt.Errorf("Failed to process one or more files:\n%s", bad) return } diff --git a/code/sonalyze/db/clusterstore_test.go b/code/sonalyze/db/clusterstore_test.go index 336d71f0..c308b19f 100644 --- a/code/sonalyze/db/clusterstore_test.go +++ b/code/sonalyze/db/clusterstore_test.go @@ -18,6 +18,7 @@ import ( "go-utils/filesys" "go-utils/hostglob" + uslices "go-utils/slices" "go-utils/status" . "sonalyze/common" ) @@ -103,13 +104,17 @@ func TestTransientSampleRead(t *testing.T) { } // The parameters are ignored here var d time.Time - samples, dropped, err := fs.ReadSamples(d, d, nil, verbose) + sampleBlobs, dropped, err := fs.ReadSamples(d, d, nil, verbose) if err != nil { t.Fatal(err) } _ = dropped - if len(samples) != 7 { - t.Fatal("Length", samples) + n := 0 + for _, s := range sampleBlobs { + n += len(s) + } + if n != 7 { + t.Fatal("Length", sampleBlobs) } } @@ -168,7 +173,7 @@ func TestPersistentSampleRead(t *testing.T) { if err != nil { t.Fatal(err) } - samples, dropped, err := pc.ReadSamples( + sampleBlobs, dropped, err := pc.ReadSamples( time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC), time.Date(2023, 05, 31, 23, 0, 12, 0, time.UTC), nil, @@ -182,8 +187,12 @@ func TestPersistentSampleRead(t *testing.T) { if dropped != 1 { t.Fatal("Dropped", dropped) } - if len(samples) != 7 { - t.Fatal("Length", samples) + n := 0 + for _, s := range sampleBlobs { + n += len(s) + } + if n != 7 { + t.Fatal("Length", sampleBlobs) } } @@ -198,7 +207,7 @@ func TestPersistentSysinfoRead(t *testing.T) { // 5/30 "b" should have one record // 5/31 "a" should have two records, not equal // 5/32 "b" should have two records, equal - records, dropped, err := pc.ReadSysinfo( + recordBlobs, dropped, err := pc.ReadSysinfo( time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC), time.Date(2023, 05, 31, 23, 0, 12, 0, time.UTC), nil, @@ -210,8 +219,12 @@ func TestPersistentSysinfoRead(t *testing.T) { if dropped != 0 { t.Fatal("Dropped", dropped) } - if len(records) != 6 { - t.Fatal("Length", records) + n := 0 + for _, rs := range recordBlobs { + n += len(rs) + } + if n != 6 { + t.Fatal("Length", recordBlobs) } } @@ -310,7 +323,7 @@ func TestPersistentSysinfoAppend(t *testing.T) { // to not see the data - a synchronous flush is technically required - and if we ever implement // that path then this test will need to have a FlushSync() call before the read. - records, _, err := pc.ReadSysinfo( + recordBlobs, _, err := pc.ReadSysinfo( time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC), time.Date(2023, 05, 28, 23, 0, 12, 0, time.UTC), nil, @@ -321,11 +334,15 @@ func TestPersistentSysinfoAppend(t *testing.T) { } // In the original data we had nonempty sysinfo only for "a" on "2023/05/28", and only one // record, so now we should have 3 in the first window and 1 in the second window. - if len(records) != 3 { - t.Fatal("Length", records) + n := 0 + for _, rs := range recordBlobs { + n += len(rs) + } + if n != 3 { + t.Fatal("Length", recordBlobs) } - records2, _, err := pc.ReadSysinfo( + recordBlobs2, _, err := pc.ReadSysinfo( time.Date(2024, 01, 01, 12, 37, 55, 0, time.UTC), time.Date(2024, 05, 01, 23, 0, 12, 0, time.UTC), nil, @@ -334,8 +351,12 @@ func TestPersistentSysinfoAppend(t *testing.T) { if err != nil { t.Fatal(err) } - if len(records2) != 1 { - t.Fatal("Length", records) + n = 0 + for _, rs := range recordBlobs2 { + n += len(rs) + } + if n != 1 { + t.Fatal("Length", recordBlobs) } pc.Close() @@ -577,7 +598,7 @@ func TestCaching(t *testing.T) { if err != nil { t.Fatal(err) } - samples, _, err := pc.ReadSamples( + sampleBlobs, _, err := pc.ReadSamples( time.Date(2023, 05, 31, 0, 0, 0, 0, time.UTC), time.Date(2023, 06, 01, 0, 0, 0, 0, time.UTC), glob, @@ -595,9 +616,18 @@ func TestCaching(t *testing.T) { if !m { t.Fatal("Missing caching msg", msgs) } - cmd := samples[len(samples)-1].Cmd - if cmd.String() != "awk" { - t.Fatal(cmd) + + // There should be exactly one record matching cmd from above in that record set, as there were + // no awk commands to begin with + + awks := 0 + for _, s := range uslices.Catenate(sampleBlobs) { + if s.Cmd.String() == "awk" { + awks++ + } + } + if awks != 1 { + t.Fatal("append failed") } // Purging of dirty data causes writeback @@ -654,14 +684,19 @@ func TestCaching(t *testing.T) { if !m { t.Fatal("Missing caching msg", msgs) } - samples, _, err = pc.ReadSamples( + sampleBlobs, _, err = pc.ReadSamples( time.Date(2023, 05, 31, 0, 0, 0, 0, time.UTC), time.Date(2023, 06, 01, 0, 0, 0, 0, time.UTC), glob, false, ) - cmd = samples[len(samples)-1].Cmd - if cmd.String() != "zappa" { - t.Fatal(cmd) + zappas := 0 + for _, s := range uslices.Catenate(sampleBlobs) { + if s.Cmd.String() == "zappa" { + zappas++ + } + } + if zappas != 1 { + t.Fatal("append failed") } } diff --git a/code/sonalyze/db/persistentcluster.go b/code/sonalyze/db/persistentcluster.go index d6d6ee81..50039d9c 100644 --- a/code/sonalyze/db/persistentcluster.go +++ b/code/sonalyze/db/persistentcluster.go @@ -262,7 +262,7 @@ func (pc *PersistentCluster) ReadSamples( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, -) (samples []*Sample, dropped int, err error) { +) (sampleBlobs [][]*Sample, dropped int, err error) { if DEBUG { Assert(fromDate.Location() == time.UTC, "UTC expected") Assert(toDate.Location() == time.UTC, "UTC expected") @@ -277,7 +277,7 @@ func (pc *PersistentCluster) ReadLoadData( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, -) (data []*LoadDatum, dropped int, err error) { +) (dataBlobs [][]*LoadDatum, dropped int, err error) { if DEBUG { Assert(fromDate.Location() == time.UTC, "UTC expected") Assert(toDate.Location() == time.UTC, "UTC expected") @@ -292,7 +292,7 @@ func (pc *PersistentCluster) ReadSysinfo( fromDate, toDate time.Time, hosts *hostglob.HostGlobber, verbose bool, -) (samples []*config.NodeConfigRecord, dropped int, err error) { +) (sysinfoBlobs [][]*config.NodeConfigRecord, dropped int, err error) { if DEBUG { Assert(fromDate.Location() == time.UTC, "UTC expected") Assert(toDate.Location() == time.UTC, "UTC expected") @@ -306,7 +306,7 @@ func (pc *PersistentCluster) ReadSysinfo( func (pc *PersistentCluster) ReadSacctData( fromDate, toDate time.Time, verbose bool, -) (records []*SacctInfo, dropped int, err error) { +) (sacctBlobs [][]*SacctInfo, dropped int, err error) { if DEBUG { Assert(fromDate.Location() == time.UTC, "UTC expected") Assert(toDate.Location() == time.UTC, "UTC expected") @@ -317,7 +317,7 @@ func (pc *PersistentCluster) ReadSacctData( ) } -func readPersistentClusterRecords[V any, U ~[]*V]( +func readPersistentClusterRecords[V any, U ~[][]*V]( pc *PersistentCluster, fromDate, toDate time.Time, hosts *hostglob.HostGlobber, @@ -325,7 +325,7 @@ func readPersistentClusterRecords[V any, U ~[]*V]( fa filesAdapter, methods ReadSyncMethods, reader func(files []*LogFile, verbose bool, methods ReadSyncMethods) (U, int, error), -) (records U, dropped int, err error) { +) (recordBlobs U, dropped int, err error) { // TODO: IMPROVEME: Don't hold the lock while reading, it's not necessary, caching is per-file // and does not interact with the cluster. But be sure to get pc.cfg while holding the lock. pc.Lock() diff --git a/code/sonalyze/db/sacctfile.go b/code/sonalyze/db/sacctfile.go index e54fdaf6..a788b08b 100644 --- a/code/sonalyze/db/sacctfile.go +++ b/code/sonalyze/db/sacctfile.go @@ -53,6 +53,6 @@ func readSacctSlice( files []*LogFile, verbose bool, reader ReadSyncMethods, -) ([]*SacctInfo, int, error) { +) ([][]*SacctInfo, int, error) { return readRecordsFromFiles[SacctInfo](files, verbose, reader) } diff --git a/code/sonalyze/db/samplefile.go b/code/sonalyze/db/samplefile.go index d8886c68..d52d8aa6 100644 --- a/code/sonalyze/db/samplefile.go +++ b/code/sonalyze/db/samplefile.go @@ -100,7 +100,7 @@ func readSampleSlice( files []*LogFile, verbose bool, reader ReadSyncMethods, -) (samples []*Sample, dropped int, err error) { +) (sampleBlobs [][]*Sample, dropped int, err error) { return readRecordsFromFiles[Sample](files, verbose, reader) } @@ -108,6 +108,6 @@ func readLoadDatumSlice( files []*LogFile, verbose bool, reader ReadSyncMethods, -) (samples []*LoadDatum, dropped int, err error) { +) (loadDataBlobs [][]*LoadDatum, dropped int, err error) { return readRecordsFromFiles[LoadDatum](files, verbose, reader) } diff --git a/code/sonalyze/db/sysinfofile.go b/code/sonalyze/db/sysinfofile.go index 89cde336..0be9070c 100644 --- a/code/sonalyze/db/sysinfofile.go +++ b/code/sonalyze/db/sysinfofile.go @@ -39,6 +39,6 @@ func readNodeConfigRecordSlice( files []*LogFile, verbose bool, reader ReadSyncMethods, -) ([]*config.NodeConfigRecord, int, error) { +) ([][]*config.NodeConfigRecord, int, error) { return readRecordsFromFiles[config.NodeConfigRecord](files, verbose, reader) } diff --git a/code/sonalyze/db/transientcluster.go b/code/sonalyze/db/transientcluster.go index ec972c35..eee6d2ca 100644 --- a/code/sonalyze/db/transientcluster.go +++ b/code/sonalyze/db/transientcluster.go @@ -110,7 +110,7 @@ func (tsc *TransientSampleCluster) ReadSamples( _, _ time.Time, _ *hostglob.HostGlobber, verbose bool, -) (samples []*Sample, dropped int, err error) { +) (sampleBlobs [][]*Sample, dropped int, err error) { tsc.Lock() defer tsc.Unlock() if tsc.closed { @@ -124,7 +124,7 @@ func (tsc *TransientSampleCluster) ReadLoadData( _, _ time.Time, _ *hostglob.HostGlobber, verbose bool, -) (data []*LoadDatum, dropped int, err error) { +) (dataBlobs [][]*LoadDatum, dropped int, err error) { tsc.Lock() defer tsc.Unlock() if tsc.closed { @@ -161,7 +161,7 @@ func (tsc *TransientSacctCluster) SacctFilenames(_, _ time.Time) ([]string, erro func (tsc *TransientSacctCluster) ReadSacctData( fromDate, toDate time.Time, verbose bool, -) (records []*SacctInfo, dropped int, err error) { +) (recordBlobs [][]*SacctInfo, dropped int, err error) { tsc.Lock() defer tsc.Unlock() if tsc.closed { diff --git a/code/sonalyze/parse/parse.go b/code/sonalyze/parse/parse.go index 5576b722..0c5e3ea8 100644 --- a/code/sonalyze/parse/parse.go +++ b/code/sonalyze/parse/parse.go @@ -11,7 +11,7 @@ import ( "go-utils/config" "go-utils/hostglob" "go-utils/maps" - "go-utils/slices" + uslices "go-utils/slices" . "sonalyze/command" . "sonalyze/common" "sonalyze/db" @@ -104,19 +104,23 @@ func (pc *ParseCommand) Perform( // Reread the data, bypassing all postprocessing, to get the expected raw values. If it's // expensive then so be it - this is special-case code usually used for limited testing, not // something you'd use for analysis. - records, _, err := cluster.ReadSamples(pc.FromDate, pc.ToDate, hostGlobber, pc.Verbose) + recordBlobs, _, err := cluster.ReadSamples(pc.FromDate, pc.ToDate, hostGlobber, pc.Verbose) if err != nil { return err } // Simulate the normal pipeline, the recordFilter application is expected by the user. - samples = sonarlog.SampleStream(slices.FilterMap( - records, - recordFilter, - func(r *db.Sample) sonarlog.Sample { - return sonarlog.Sample{S: r} - }, - )) + mapped := make([]sonarlog.Sample, 0) + for _, records := range recordBlobs { + mapped = append(mapped, uslices.FilterMap( + records, + recordFilter, + func(r *db.Sample) sonarlog.Sample { + return sonarlog.Sample{S: r} + }, + )...) + } + samples = sonarlog.SampleStream(mapped) case pc.Clean: mergedSamples = maps.Values(streams) diff --git a/code/sonalyze/sacct/perform.go b/code/sonalyze/sacct/perform.go index 6cfdc5bc..b6440416 100644 --- a/code/sonalyze/sacct/perform.go +++ b/code/sonalyze/sacct/perform.go @@ -5,6 +5,7 @@ import ( "io" "go-utils/hostglob" + uslices "go-utils/slices" . "sonalyze/common" "sonalyze/db" @@ -39,7 +40,7 @@ func (sc *SacctCommand) Sacct(_ io.Reader, stdout, stderr io.Writer) error { // Read the raw sacct data. - records, dropped, err := theLog.ReadSacctData( + recordBlobs, dropped, err := theLog.ReadSacctData( sc.FromDate, sc.ToDate, sc.Verbose, @@ -47,6 +48,8 @@ func (sc *SacctCommand) Sacct(_ io.Reader, stdout, stderr io.Writer) error { if err != nil { return fmt.Errorf("Failed to read log records\n%w", err) } + // FIXME: This is expedient + records := uslices.Catenate(recordBlobs) if sc.Verbose { Log.Infof("%d records read + %d dropped", len(records), dropped) UstrStats(stderr, false) diff --git a/code/sonalyze/sonarlog/ingest.go b/code/sonalyze/sonarlog/ingest.go index 626875d5..c284f3fd 100644 --- a/code/sonalyze/sonarlog/ingest.go +++ b/code/sonalyze/sonarlog/ingest.go @@ -34,12 +34,14 @@ func ReadSampleStreams( read, dropped int, err error, ) { - samples, dropped, err := c.ReadSamples(fromDate, toDate, hostGlobber, verbose) + sampleBlobs, dropped, err := c.ReadSamples(fromDate, toDate, hostGlobber, verbose) if err != nil { return } - read = len(samples) - streams, bounds = createInputStreams(samples, recordFilter) + for _, samples := range sampleBlobs { + read += len(samples) + } + streams, bounds = createInputStreams(sampleBlobs, recordFilter) return } @@ -56,12 +58,14 @@ func ReadLoadDataStreams( ) { // Read and establish invariants - data, dropped, err := c.ReadLoadData(fromDate, toDate, hostGlobber, verbose) + dataBlobs, dropped, err := c.ReadLoadData(fromDate, toDate, hostGlobber, verbose) if err != nil { return } - read = len(data) - streams, bounds, errors := rectifyLoadData(data) + for _, data := range dataBlobs { + read += len(data) + } + streams, bounds, errors := rectifyLoadData(dataBlobs) dropped += errors return } diff --git a/code/sonalyze/sonarlog/postprocess.go b/code/sonalyze/sonarlog/postprocess.go index 95d52a3d..7ce48742 100644 --- a/code/sonalyze/sonarlog/postprocess.go +++ b/code/sonalyze/sonarlog/postprocess.go @@ -88,7 +88,7 @@ func standardSampleRectifier(xs []*db.Sample, cfg *config.ClusterConfig) []*db.S // adjacent records with the same timestamp. func createInputStreams( - entries []*db.Sample, + entryBlobs [][]*db.Sample, recordFilter db.SampleFilter, ) (InputStreamSet, Timebounds) { streams := make(InputStreamSet) @@ -100,28 +100,30 @@ func createInputStreams( // Also compute time bounds. Bounds are computed from the entire input set before filtering or // other postprocessing - this is closest to what's expected. But that makes the bound // computation a significant expense. - for _, e := range entries { - if bound, found := bounds[e.Host]; found { - bounds[e.Host] = Timebound{ - Earliest: min(bound.Earliest, e.Timestamp), - Latest: max(bound.Latest, e.Timestamp), - } - } else { - bounds[e.Host] = Timebound{ - Earliest: e.Timestamp, - Latest: e.Timestamp, + for _, entries := range entryBlobs { + for _, e := range entries { + if bound, found := bounds[e.Host]; found { + bounds[e.Host] = Timebound{ + Earliest: min(bound.Earliest, e.Timestamp), + Latest: max(bound.Latest, e.Timestamp), + } + } else { + bounds[e.Host] = Timebound{ + Earliest: e.Timestamp, + Latest: e.Timestamp, + } } - } - if !recordFilter(e) { - continue - } + if !recordFilter(e) { + continue + } - key := InputStreamKey{e.Host, streamId(e), e.Cmd} - if stream, found := streams[key]; found { - *stream = append(*stream, Sample{S: e}) - } else { - streams[key] = &SampleStream{Sample{S: e}} + key := InputStreamKey{e.Host, streamId(e), e.Cmd} + if stream, found := streams[key]; found { + *stream = append(*stream, Sample{S: e}) + } else { + streams[key] = &SampleStream{Sample{S: e}} + } } } @@ -235,27 +237,29 @@ func removeEmptyStreams(streams InputStreamSet) { }) } -func rectifyLoadData(data []*db.LoadDatum) (streams LoadDataSet, bounds Timebounds, errors int) { +func rectifyLoadData(dataBlobs [][]*db.LoadDatum) (streams LoadDataSet, bounds Timebounds, errors int) { // Divide data among the hosts and decode each array streams = make(LoadDataSet) - for _, d := range data { - decoded, err := decodeLoadData(d.Encoded) - if err != nil { - errors++ - continue - } - datum := LoadDatum{ - Time: d.Timestamp, - Decoded: decoded, - } - if stream, found := streams[d.Host]; found { - stream.Data = append(stream.Data, datum) - } else { - stream := LoadData{ - Host: d.Host, - Data: []LoadDatum{datum}, + for _, data := range dataBlobs { + for _, d := range data { + decoded, err := decodeLoadData(d.Encoded) + if err != nil { + errors++ + continue + } + datum := LoadDatum{ + Time: d.Timestamp, + Decoded: decoded, + } + if stream, found := streams[d.Host]; found { + stream.Data = append(stream.Data, datum) + } else { + stream := LoadData{ + Host: d.Host, + Data: []LoadDatum{datum}, + } + streams[d.Host] = &stream } - streams[d.Host] = &stream } } diff --git a/code/sonalyze/sonarlog/postprocess_test.go b/code/sonalyze/sonarlog/postprocess_test.go index a1910310..1c516e83 100644 --- a/code/sonalyze/sonarlog/postprocess_test.go +++ b/code/sonalyze/sonarlog/postprocess_test.go @@ -43,17 +43,19 @@ func TestRectifyGpuMem(t *testing.T) { // gpumem%=12 so we should see a computed value for gpukib which is different from the gpukib // figure in the data. var notime time.Time - samples, _, err := c.ReadSamples(notime, notime, nil, false) + sampleBlobs, _, err := c.ReadSamples(notime, notime, nil, false) if err != nil { t.Fatal(err) } found := 0 expect := uint64((memsize * 12) / 100 * 1024 * 1024) - for _, s := range samples { - if s.Job == 1249151 { - found++ - if s.GpuKib != expect { - t.Errorf("GpuKib %v expected %v (%v %v)", s.GpuKib, expect, s.GpuPct, s.GpuMemPct) + for _, samples := range sampleBlobs { + for _, s := range samples { + if s.Job == 1249151 { + found++ + if s.GpuKib != expect { + t.Errorf("GpuKib %v expected %v (%v %v)", s.GpuKib, expect, s.GpuPct, s.GpuMemPct) + } } } } @@ -85,7 +87,7 @@ func TestPostprocessLogCpuUtilPct(t *testing.T) { filter := func(r *db.Sample) bool { return r.User != root } - streams, _ := createInputStreams(entries, filter) + streams, _ := createInputStreams([][]*db.Sample{entries}, filter) ComputePerSampleFields(streams) if len(streams) != 4 {