From 06979d522d259a1648de89aa2166d45def28c58b Mon Sep 17 00:00:00 2001 From: Evangelos Skopelitis Date: Wed, 19 Jul 2023 13:21:24 -0400 Subject: [PATCH] backend: Add instance statistics table --- backend/pkg/api/bindata.go | 23 +++ .../0019_add_instance_stats_table.sql | 14 ++ backend/pkg/api/instances.go | 163 ++++++++++++++++++ backend/pkg/api/instances_test.go | 102 +++++++++++ 4 files changed, 302 insertions(+) create mode 100644 backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql diff --git a/backend/pkg/api/bindata.go b/backend/pkg/api/bindata.go index 62763108a..25cad1fb2 100644 --- a/backend/pkg/api/bindata.go +++ b/backend/pkg/api/bindata.go @@ -20,6 +20,7 @@ // db/migrations/0016_add_version_breakdown_indexes.sql (734B) // db/migrations/0017_drop_unused_indexes.sql (297B) // db/migrations/0018_add_sha256_file_field.sql (173B) +// db/migrations/0019_add_instance_stats_table.sql (361B) package api @@ -487,6 +488,26 @@ func dbMigrations0018_add_sha256_file_fieldSql() (*asset, error) { return a, nil } +var _dbMigrations0019_add_instance_stats_tableSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x6c\x90\xcd\x4a\xc6\x30\x10\x45\xf7\xf3\x14\xb3\x4c\x30\x05\x11\xc4\x45\xd1\x95\xaf\xe0\xba\x8c\x71\xb4\xc1\x64\x52\x93\x69\x15\x9f\x5e\x5a\xe9\x8f\xf5\xdb\x0d\x9c\x7b\x93\xc3\x6d\x1a\xbc\x4a\xe1\xad\x90\x32\x3e\x0d\x00\xbe\xf0\x7c\x2a\x3d\x47\xc6\xf0\x8a\x92\x15\xf9\x2b\x54\xad\x18\xa4\x2a\x89\xe7\xae\x2a\x69\x45\x03\x88\x88\x1a\x12\x57\xa5\x34\xec\x97\x7e\x2f\x2d\x19\x63\x74\x4b\xc6\xf7\x24\xc2\xb1\x13\x4a\x8c\x13\x15\xdf\x53\x31\x37\xb7\xf6\x14\x9b\xc1\x86\xef\xce\x74\xe2\x52\x43\x96\x43\xff\xdf\x03\xab\xe0\xac\xaa\x1b\x43\xdf\xb3\x7f\x47\xb3\xd3\x87\x7b\xbc\xb6\xbf\x95\x51\xc2\xc7\xc8\x66\x73\x77\x7f\x64\xdd\xe2\xe4\xd6\xbf\x2d\xd8\x16\xe0\xb8\xd8\x63\xfe\x14\x80\x97\x92\x87\x7d\xb1\x8b\x6b\xb5\xf0\x13\x00\x00\xff\xff\xdc\x2b\xd0\xda\x69\x01\x00\x00") + +func dbMigrations0019_add_instance_stats_tableSqlBytes() ([]byte, error) { + return bindataRead( + _dbMigrations0019_add_instance_stats_tableSql, + "db/migrations/0019_add_instance_stats_table.sql", + ) +} + +func dbMigrations0019_add_instance_stats_tableSql() (*asset, error) { + bytes, err := dbMigrations0019_add_instance_stats_tableSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "db/migrations/0019_add_instance_stats_table.sql", size: 361, mode: os.FileMode(0644), modTime: time.Unix(1, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x58, 0x78, 0xb3, 0x28, 0xce, 0xbb, 0xe7, 0x2, 0x5e, 0xaa, 0xdc, 0x2a, 0x88, 0xe6, 0x60, 0x28, 0x95, 0x87, 0x4a, 0x2d, 0x6e, 0x73, 0x9c, 0x5c, 0x11, 0x23, 0xb0, 0xbb, 0x3e, 0x60, 0x4c, 0x76}} + return a, nil +} + // Asset loads and returns the asset for the given name. // It returns an error if the asset could not be found or // could not be loaded. @@ -598,6 +619,7 @@ var _bindata = map[string]func() (*asset, error){ "db/migrations/0016_add_version_breakdown_indexes.sql": dbMigrations0016_add_version_breakdown_indexesSql, "db/migrations/0017_drop_unused_indexes.sql": dbMigrations0017_drop_unused_indexesSql, "db/migrations/0018_add_sha256_file_field.sql": dbMigrations0018_add_sha256_file_fieldSql, + "db/migrations/0019_add_instance_stats_table.sql": dbMigrations0019_add_instance_stats_tableSql, } // AssetDebug is true if the assets were built with the debug flag enabled. @@ -667,6 +689,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "0016_add_version_breakdown_indexes.sql": {dbMigrations0016_add_version_breakdown_indexesSql, map[string]*bintree{}}, "0017_drop_unused_indexes.sql": {dbMigrations0017_drop_unused_indexesSql, map[string]*bintree{}}, "0018_add_sha256_file_field.sql": {dbMigrations0018_add_sha256_file_fieldSql, map[string]*bintree{}}, + "0019_add_instance_stats_table.sql": {dbMigrations0019_add_instance_stats_tableSql, map[string]*bintree{}}, }}, "sample_data.sql": {dbSample_dataSql, map[string]*bintree{}}, }}, diff --git a/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql b/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql new file mode 100644 index 000000000..85283d286 --- /dev/null +++ b/backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql @@ -0,0 +1,14 @@ +-- +migrate Up + +create table if not exists instance_stats ( + timestamp timestamptz not null, + channel_name varchar(25) not null, + arch varchar(7) not null, + version varchar(255) not null, + instances int not null check (instances >= 0), + unique(timestamp, channel_name, arch, version) +); + +-- +migrate Down + +drop table if exists instance_stats; diff --git a/backend/pkg/api/instances.go b/backend/pkg/api/instances.go index 3480f0b7e..85d0e7d26 100644 --- a/backend/pkg/api/instances.go +++ b/backend/pkg/api/instances.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "strconv" + "strings" "time" "github.com/doug-martin/goqu/v9" @@ -48,6 +49,7 @@ const ( const ( validityInterval postgresDuration = "1 days" + defaultInterval time.Duration = 2 * time.Hour ) // Instance represents an instance running one or more applications for which @@ -108,6 +110,14 @@ type InstancesQueryParams struct { SearchValue string `json:"search_value"` } +type InstanceStats struct { + Timestamp time.Time `db:"timestamp" json:"timestamp"` + ChannelName string `db:"channel_name" json:"channel_name"` + Arch string `db:"arch" json:"arch"` + Version string `db:"version" json:"version"` + Instances int `db:"instances" json:"instances"` +} + type instanceFilterItem int const ( @@ -631,3 +641,156 @@ func (api *API) instanceStatusHistoryQuery(instanceID, appID, groupID string, li Order(goqu.C("created_ts").Desc()). Limit(uint(limit)) } + +// instanceStatsQuery returns a SelectDataset prepared to return all instances +// that have been checked in during a given duration from a given time. +func (api *API) instanceStatsQuery(t *time.Time, duration *time.Duration) *goqu.SelectDataset { + if t == nil { + now := time.Now() + t = &now + } + + if duration == nil { + d := defaultInterval + duration = &d + } + + // Helper function to convert duration to PostgreSQL interval string + durationToInterval := func(d time.Duration) string { + if d <= 0 { + d = time.Microsecond + } + + parts := []string{} + + hours := int(d.Hours()) + if hours != 0 { + parts = append(parts, fmt.Sprintf("%d hours", hours)) + } + + remainder := d - time.Duration(hours)*time.Hour + minutes := int(remainder.Minutes()) + if minutes != 0 { + parts = append(parts, fmt.Sprintf("%d minutes", minutes)) + } + + remainder -= time.Duration(minutes) * time.Minute + seconds := int(remainder.Seconds()) + if seconds != 0 { + parts = append(parts, fmt.Sprintf("%d seconds", seconds)) + } + + remainder -= time.Duration(seconds) * time.Second + microseconds := remainder.Microseconds() + if microseconds != 0 { + parts = append(parts, fmt.Sprintf("%d microseconds", microseconds)) + } + + return strings.Join(parts, " ") + } + + interval := durationToInterval(*duration) + timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00"))) + timestampMinusDuration := goqu.L("timestamp ? - interval ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")), interval) + + query := goqu.From(goqu.T("instance_application")). + Select( + timestamp, + goqu.T("channel").Col("name").As("channel_name"), + goqu.Case(). + When(goqu.T("channel").Col("arch").Eq(1), "AMD64"). + When(goqu.T("channel").Col("arch").Eq(2), "ARM"). + Else(""). + As("arch"), + goqu.C("version").As("version"), + goqu.COUNT("*").As("instances")). + Join(goqu.T("groups"), goqu.On(goqu.C("group_id").Eq(goqu.T("groups").Col("id")))). + Join(goqu.T("channel"), goqu.On(goqu.T("groups").Col("channel_id").Eq(goqu.T("channel").Col("id")))). + Where( + goqu.C("last_check_for_updates").Gt(timestampMinusDuration), + goqu.C("last_check_for_updates").Lte(timestamp)). + GroupBy(timestamp, + goqu.T("channel").Col("name"), + goqu.T("channel").Col("arch"), + goqu.C("version")). + Order(timestamp.Asc()) + + return query +} + +// GetInstanceStats returns an InstanceStats table with all instances that have +// been previously been checked in. +func (api *API) GetInstanceStats() ([]InstanceStats, error) { + query, _, err := goqu.From("instance_stats"). + Order(goqu.C("timestamp").Asc()).ToSQL() + if err != nil { + return nil, err + } + + rows, err := api.db.Queryx(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var instances []InstanceStats + for rows.Next() { + var instance InstanceStats + err = rows.StructScan(&instance) + if err != nil { + return nil, err + } + instances = append(instances, instance) + } + + return instances, nil +} + +// GetInstanceStatsByTimestamp returns an InstanceStats array of instances matching a +// given timestamp value, ordered by version. +func (api *API) GetInstanceStatsByTimestamp(t time.Time) ([]InstanceStats, error) { + timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00"))) + + query, _, err := goqu.From("instance_stats"). + Where(goqu.C("timestamp").Eq(timestamp)). + Order(goqu.C("version").Asc()).ToSQL() + if err != nil { + return nil, err + } + + rows, err := api.db.Queryx(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var instances []InstanceStats + for rows.Next() { + var instance InstanceStats + err = rows.StructScan(&instance) + if err != nil { + return nil, err + } + instances = append(instances, instance) + } + + return instances, nil +} + +// updateInstanceStats updates the instance_stats table with instances checked +// in during a given duration from a given time. +func (api *API) updateInstanceStats(t *time.Time, duration *time.Duration) error { + insertQuery, _, err := goqu.Insert(goqu.T("instance_stats")). + Cols("timestamp", "channel_name", "arch", "version", "instances"). + FromQuery(api.instanceStatsQuery(t, duration)). + ToSQL() + if err != nil { + return err + } + + _, err = api.db.Exec(insertQuery) + if err != nil { + return err + } + return nil +} diff --git a/backend/pkg/api/instances_test.go b/backend/pkg/api/instances_test.go index d40d85107..296dc4d5f 100644 --- a/backend/pkg/api/instances_test.go +++ b/backend/pkg/api/instances_test.go @@ -3,6 +3,7 @@ package api import ( "fmt" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -294,3 +295,104 @@ func TestGetInstanceStatusHistory(t *testing.T) { assert.Equal(t, history[3].Status, InstanceStatusUpdateGranted) assert.Equal(t, history[3].Version, "1.0.1") } + +func TestUpdateInstanceStats(t *testing.T) { + a := newForTest(t) + defer a.Close() + + instances, err := a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 0, len(instances)) + + // First test case: Create tInstance1, tInstance2, and tInstance3; check tInstance1 twice; switch tInstance2 version + start := time.Now().UTC() + + tTeam, _ := a.AddTeam(&Team{Name: "test_team"}) + tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID}) + tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID, Arch: ArchAMD64}) + tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID), Arch: ArchAMD64}) + tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"}) + tInstance1, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + tInstance2, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.2", "1.0.0", tApp.ID, tGroup.ID) + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.3", "1.0.1", tApp.ID, tGroup.ID) + + _, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.1", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + ts := time.Now().UTC() + elapsed := ts.Sub(start) + + err = a.updateInstanceStats(&ts, &elapsed) + assert.NoError(t, err) + + instances, err = a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 2, len(instances)) + + instanceStats, err := a.GetInstanceStatsByTimestamp(ts) + assert.NoError(t, err) + assert.Equal(t, 2, len(instanceStats)) + assert.Equal(t, "1.0.0", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) + assert.Equal(t, "1.0.1", instanceStats[1].Version) + assert.Equal(t, 2, instanceStats[1].Instances) + + // Next test case: Switch tInstance1 and tInstance2 versions to workaround the 5-minutes-rate-limiting of the check-in time and add new instance + ts2 := time.Now().UTC() + + _, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.3", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.4", tApp.ID, tGroup.ID) + assert.NoError(t, err) + + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.4", "1.0.5", tApp.ID, tGroup.ID) + + ts3 := time.Now().UTC() + elapsed = ts3.Sub(ts2) + + err = a.updateInstanceStats(&ts3, &elapsed) + assert.NoError(t, err) + + instances, err = a.GetInstanceStats() + assert.NoError(t, err) + assert.Equal(t, 5, len(instances)) + + instanceStats, err = a.GetInstanceStatsByTimestamp(ts3) + assert.NoError(t, err) + assert.Equal(t, 3, len(instanceStats)) + assert.Equal(t, "1.0.3", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) + assert.Equal(t, "1.0.4", instanceStats[1].Version) + assert.Equal(t, 1, instanceStats[1].Instances) + assert.Equal(t, "1.0.5", instanceStats[2].Version) + assert.Equal(t, 1, instanceStats[2].Instances) +} + +func TestUpdateInstanceStatsNoArch(t *testing.T) { + a := newForTest(t) + defer a.Close() + + tTeam, _ := a.AddTeam(&Team{Name: "test_team"}) + tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID}) + tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID}) + tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID)}) + tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"}) + _, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID) + + ts := time.Now().UTC() + // Use large duration to have some test coverage for durationToInterval + elapsed := 3*time.Hour + 45*time.Minute + 30*time.Second + 1000*time.Microsecond + + err := a.updateInstanceStats(&ts, &elapsed) + assert.NoError(t, err) + + instanceStats, err := a.GetInstanceStatsByTimestamp(ts) + assert.NoError(t, err) + assert.Equal(t, "", instanceStats[0].Arch) + assert.Equal(t, "1.0.0", instanceStats[0].Version) + assert.Equal(t, 1, instanceStats[0].Instances) +}