Skip to content

Commit

Permalink
For #526 - Change DB APIs to return slice-of-slice with shared data
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars T Hansen committed Sep 17, 2024
1 parent 7789759 commit 2a5b247
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 111 deletions.
45 changes: 31 additions & 14 deletions code/sonalyze/db/clusterstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,22 @@ type SampleCluster interface {
) ([]string, error)

// Read `ps` samples from all the files selected by SampleFilenames() and extract the
// per-process sample data. Times must be UTC.
// per-process sample data. Times must be UTC. The inner slices of the result, and the
// records they point to, must not be mutated in any way.
ReadSamples(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (samples []*Sample, dropped int, err error)
) (sampleBlobs [][]*Sample, dropped int, err error)

// Read `ps` samples from all the files selected by SampleFilenames() and extract the load data.
// Times must be UTC.
// Times must be UTC. The inner slices of the result, and the records they point to, must not
// be mutated in any way.
ReadLoadData(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (data []*LoadDatum, dropped int, err error)
) (dataBlobs [][]*LoadDatum, dropped int, err error)
}

// A SysinfoCluster can provide `sonar sysinfo` data: per-system hardware configuration data.
Expand All @@ -133,11 +135,13 @@ type SysinfoCluster interface {
) ([]string, error)

// Read `sysinfo` records from all the files selected by SysinfoFilenames(). Times must be UTC.
// The inner slices of the result, and the records they point to, must not be mutated in any
// way.
ReadSysinfo(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (records []*config.NodeConfigRecord, dropped int, err error)
) (sysinfoBlobs [][]*config.NodeConfigRecord, dropped int, err error)
}

// There is no HostGlobber here, as the sacct data are not mostly node-oriented. Any analysis
Expand All @@ -152,10 +156,12 @@ type SacctCluster interface {
) ([]string, error)

// Read `sacct` records from all the files selected by SacctFilenames(). Times must be UTC.
// The inner slices of the result, and the records they point to, must not be mutated in any
// way.
ReadSacctData(
fromDate, toDate time.Time,
verbose bool,
) (records []*SacctInfo, dropped int, err error)
) (recordBlobs [][]*SacctInfo, dropped int, err error)
}

// An AppendableCluster (not yet well developed, this could be split into appending different types
Expand Down Expand Up @@ -389,22 +395,33 @@ func init() {
}
}

// Read a set of records from a set of files and return the resulting list, which may be in any
// order (b/c concurrency) but will always be freshly allocated (pointing to shared, immutable data
// objects). We do this by passing read request to the pool of readers and collecting the results.
// Read a set of records from a set of files and return a slice of slices, normally one inner slice
// per file. The outer slice is always freshly allocated but the inner slices, though immutable,
// are owned by the database layer, as is their underlying storage. The objects pointed to from the
// inner slices are also shared and immutable. The inner slices may be in any order due to
// concurrency in the database access layer (reading is implemented by by passing the read request
// to the pool of readers and collecting the results).
//
// To be safe, clients should iterate over the data structure as soon as they can and not retain
// references to the slices longer than necessary. The inner slices *MUST* not be mutated; in
// particular, they must not be sorted.
//
// On return, `dropped` is an indication of the number of benign errors, but it conflates dropped
// records and dropped fields. err is non-nil only for non-benign records, in which case it
// attempts to collect information about all errors encountered.
//
// TODO: IMPROVEME: The API is a little crusty. We could distinguish dropped fields vs dropped
// records, and we could sensibly return partial results too.
//
// TODO: We could strengthen the notion of immutability of the results by wrapping the result set in
// a typesafe container that can be iterated over, esp in Go 1.23 or later. But even defining a
// type a la ResultSet[T] and returning that would help, probably.

func readRecordsFromFiles[T any](
files []*LogFile,
verbose bool,
reader ReadSyncMethods,
) (records []*T, dropped int, err error) {
) (recordBlobs [][]*T, dropped int, err error) {
if verbose {
Log.Infof("%d files", len(files))
}
Expand All @@ -414,7 +431,7 @@ func readRecordsFromFiles[T any](
// about 32e6 records).

results := make(chan parseResult, 100)
records = make([]*T, 0)
recordBlobs = make([][]*T, 0)

toReceive := len(files)
toSend := 0
Expand All @@ -434,7 +451,7 @@ func readRecordsFromFiles[T any](
if res.err != nil {
bad += " " + res.err.Error() + "\n"
} else {
records = append(records, res.data.([]*T)...)
recordBlobs = append(recordBlobs, res.data.([]*T))
dropped += res.dropped
}
toReceive--
Expand All @@ -445,14 +462,14 @@ func readRecordsFromFiles[T any](
if res.err != nil {
bad += " " + res.err.Error() + "\n"
} else {
records = append(records, res.data.([]*T)...)
recordBlobs = append(recordBlobs, res.data.([]*T))
dropped += res.dropped
}
toReceive--
}

if bad != "" {
records = nil
recordBlobs = nil
err = fmt.Errorf("Failed to process one or more files:\n%s", bad)
return
}
Expand Down
81 changes: 58 additions & 23 deletions code/sonalyze/db/clusterstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"go-utils/filesys"
"go-utils/hostglob"
uslices "go-utils/slices"
"go-utils/status"
. "sonalyze/common"
)
Expand Down Expand Up @@ -103,13 +104,17 @@ func TestTransientSampleRead(t *testing.T) {
}
// The parameters are ignored here
var d time.Time
samples, dropped, err := fs.ReadSamples(d, d, nil, verbose)
sampleBlobs, dropped, err := fs.ReadSamples(d, d, nil, verbose)
if err != nil {
t.Fatal(err)
}
_ = dropped
if len(samples) != 7 {
t.Fatal("Length", samples)
n := 0
for _, s := range sampleBlobs {
n += len(s)
}
if n != 7 {
t.Fatal("Length", sampleBlobs)
}
}

Expand Down Expand Up @@ -168,7 +173,7 @@ func TestPersistentSampleRead(t *testing.T) {
if err != nil {
t.Fatal(err)
}
samples, dropped, err := pc.ReadSamples(
sampleBlobs, dropped, err := pc.ReadSamples(
time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC),
time.Date(2023, 05, 31, 23, 0, 12, 0, time.UTC),
nil,
Expand All @@ -182,8 +187,12 @@ func TestPersistentSampleRead(t *testing.T) {
if dropped != 1 {
t.Fatal("Dropped", dropped)
}
if len(samples) != 7 {
t.Fatal("Length", samples)
n := 0
for _, s := range sampleBlobs {
n += len(s)
}
if n != 7 {
t.Fatal("Length", sampleBlobs)
}
}

Expand All @@ -198,7 +207,7 @@ func TestPersistentSysinfoRead(t *testing.T) {
// 5/30 "b" should have one record
// 5/31 "a" should have two records, not equal
// 5/32 "b" should have two records, equal
records, dropped, err := pc.ReadSysinfo(
recordBlobs, dropped, err := pc.ReadSysinfo(
time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC),
time.Date(2023, 05, 31, 23, 0, 12, 0, time.UTC),
nil,
Expand All @@ -210,8 +219,12 @@ func TestPersistentSysinfoRead(t *testing.T) {
if dropped != 0 {
t.Fatal("Dropped", dropped)
}
if len(records) != 6 {
t.Fatal("Length", records)
n := 0
for _, rs := range recordBlobs {
n += len(rs)
}
if n != 6 {
t.Fatal("Length", recordBlobs)
}
}

Expand Down Expand Up @@ -310,7 +323,7 @@ func TestPersistentSysinfoAppend(t *testing.T) {
// to not see the data - a synchronous flush is technically required - and if we ever implement
// that path then this test will need to have a FlushSync() call before the read.

records, _, err := pc.ReadSysinfo(
recordBlobs, _, err := pc.ReadSysinfo(
time.Date(2023, 05, 28, 12, 37, 55, 0, time.UTC),
time.Date(2023, 05, 28, 23, 0, 12, 0, time.UTC),
nil,
Expand All @@ -321,11 +334,15 @@ func TestPersistentSysinfoAppend(t *testing.T) {
}
// In the original data we had nonempty sysinfo only for "a" on "2023/05/28", and only one
// record, so now we should have 3 in the first window and 1 in the second window.
if len(records) != 3 {
t.Fatal("Length", records)
n := 0
for _, rs := range recordBlobs {
n += len(rs)
}
if n != 3 {
t.Fatal("Length", recordBlobs)
}

records2, _, err := pc.ReadSysinfo(
recordBlobs2, _, err := pc.ReadSysinfo(
time.Date(2024, 01, 01, 12, 37, 55, 0, time.UTC),
time.Date(2024, 05, 01, 23, 0, 12, 0, time.UTC),
nil,
Expand All @@ -334,8 +351,12 @@ func TestPersistentSysinfoAppend(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(records2) != 1 {
t.Fatal("Length", records)
n = 0
for _, rs := range recordBlobs2 {
n += len(rs)
}
if n != 1 {
t.Fatal("Length", recordBlobs)
}

pc.Close()
Expand Down Expand Up @@ -577,7 +598,7 @@ func TestCaching(t *testing.T) {
if err != nil {
t.Fatal(err)
}
samples, _, err := pc.ReadSamples(
sampleBlobs, _, err := pc.ReadSamples(
time.Date(2023, 05, 31, 0, 0, 0, 0, time.UTC),
time.Date(2023, 06, 01, 0, 0, 0, 0, time.UTC),
glob,
Expand All @@ -595,9 +616,18 @@ func TestCaching(t *testing.T) {
if !m {
t.Fatal("Missing caching msg", msgs)
}
cmd := samples[len(samples)-1].Cmd
if cmd.String() != "awk" {
t.Fatal(cmd)

// There should be exactly one record matching cmd from above in that record set, as there were
// no awk commands to begin with

awks := 0
for _, s := range uslices.Catenate(sampleBlobs) {
if s.Cmd.String() == "awk" {
awks++
}
}
if awks != 1 {
t.Fatal("append failed")
}

// Purging of dirty data causes writeback
Expand Down Expand Up @@ -654,14 +684,19 @@ func TestCaching(t *testing.T) {
if !m {
t.Fatal("Missing caching msg", msgs)
}
samples, _, err = pc.ReadSamples(
sampleBlobs, _, err = pc.ReadSamples(
time.Date(2023, 05, 31, 0, 0, 0, 0, time.UTC),
time.Date(2023, 06, 01, 0, 0, 0, 0, time.UTC),
glob,
false,
)
cmd = samples[len(samples)-1].Cmd
if cmd.String() != "zappa" {
t.Fatal(cmd)
zappas := 0
for _, s := range uslices.Catenate(sampleBlobs) {
if s.Cmd.String() == "zappa" {
zappas++
}
}
if zappas != 1 {
t.Fatal("append failed")
}
}
12 changes: 6 additions & 6 deletions code/sonalyze/db/persistentcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (pc *PersistentCluster) ReadSamples(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (samples []*Sample, dropped int, err error) {
) (sampleBlobs [][]*Sample, dropped int, err error) {
if DEBUG {
Assert(fromDate.Location() == time.UTC, "UTC expected")
Assert(toDate.Location() == time.UTC, "UTC expected")
Expand All @@ -277,7 +277,7 @@ func (pc *PersistentCluster) ReadLoadData(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (data []*LoadDatum, dropped int, err error) {
) (dataBlobs [][]*LoadDatum, dropped int, err error) {
if DEBUG {
Assert(fromDate.Location() == time.UTC, "UTC expected")
Assert(toDate.Location() == time.UTC, "UTC expected")
Expand All @@ -292,7 +292,7 @@ func (pc *PersistentCluster) ReadSysinfo(
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
) (samples []*config.NodeConfigRecord, dropped int, err error) {
) (sysinfoBlobs [][]*config.NodeConfigRecord, dropped int, err error) {
if DEBUG {
Assert(fromDate.Location() == time.UTC, "UTC expected")
Assert(toDate.Location() == time.UTC, "UTC expected")
Expand All @@ -306,7 +306,7 @@ func (pc *PersistentCluster) ReadSysinfo(
func (pc *PersistentCluster) ReadSacctData(
fromDate, toDate time.Time,
verbose bool,
) (records []*SacctInfo, dropped int, err error) {
) (sacctBlobs [][]*SacctInfo, dropped int, err error) {
if DEBUG {
Assert(fromDate.Location() == time.UTC, "UTC expected")
Assert(toDate.Location() == time.UTC, "UTC expected")
Expand All @@ -317,15 +317,15 @@ func (pc *PersistentCluster) ReadSacctData(
)
}

func readPersistentClusterRecords[V any, U ~[]*V](
func readPersistentClusterRecords[V any, U ~[][]*V](
pc *PersistentCluster,
fromDate, toDate time.Time,
hosts *hostglob.HostGlobber,
verbose bool,
fa filesAdapter,
methods ReadSyncMethods,
reader func(files []*LogFile, verbose bool, methods ReadSyncMethods) (U, int, error),
) (records U, dropped int, err error) {
) (recordBlobs U, dropped int, err error) {
// TODO: IMPROVEME: Don't hold the lock while reading, it's not necessary, caching is per-file
// and does not interact with the cluster. But be sure to get pc.cfg while holding the lock.
pc.Lock()
Expand Down
2 changes: 1 addition & 1 deletion code/sonalyze/db/sacctfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ func readSacctSlice(
files []*LogFile,
verbose bool,
reader ReadSyncMethods,
) ([]*SacctInfo, int, error) {
) ([][]*SacctInfo, int, error) {
return readRecordsFromFiles[SacctInfo](files, verbose, reader)
}
4 changes: 2 additions & 2 deletions code/sonalyze/db/samplefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func readSampleSlice(
files []*LogFile,
verbose bool,
reader ReadSyncMethods,
) (samples []*Sample, dropped int, err error) {
) (sampleBlobs [][]*Sample, dropped int, err error) {
return readRecordsFromFiles[Sample](files, verbose, reader)
}

func readLoadDatumSlice(
files []*LogFile,
verbose bool,
reader ReadSyncMethods,
) (samples []*LoadDatum, dropped int, err error) {
) (loadDataBlobs [][]*LoadDatum, dropped int, err error) {
return readRecordsFromFiles[LoadDatum](files, verbose, reader)
}
2 changes: 1 addition & 1 deletion code/sonalyze/db/sysinfofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ func readNodeConfigRecordSlice(
files []*LogFile,
verbose bool,
reader ReadSyncMethods,
) ([]*config.NodeConfigRecord, int, error) {
) ([][]*config.NodeConfigRecord, int, error) {
return readRecordsFromFiles[config.NodeConfigRecord](files, verbose, reader)
}
Loading

0 comments on commit 2a5b247

Please sign in to comment.