Skip to content

Commit

Permalink
backend: Add instance statistics table
Browse files Browse the repository at this point in the history
A by-product of the Omaha update protocol employed by Nebraska delivers statistics on
version spread, but accumulating and presenting this data suffers from a number of
limitations:

- The chart only represents all nodes’ current OS versions. We currently do not have
  historic data of the nodes’ previous software versions, and we would like to track of
  this data.
- While historic data can be calculated from update events, this is computationally
  intense and unfeasible in practice. We currently cannot calculate or store this
  historic data efficiently, and we would like to be able to have quick and easy access
  to this data.
- To be able to back-fill the live Nebraska server using historic data available in CSV
  format, we need to use a third-party tool. We currently cannot accomplish this using
  our existing setup, and we would like to implement a mechanism to do this natively.

We implement the `instance_stats` table (based on instance fields from
`instance_application`) to store data on current version spread in an efficient format
optimized for querying. To accomplish this, there are five data points that we
associate each instance with, serving as the main fields of this table:

- Timestamp: Necessary for keeping track of when update checks are performed
- Channel name: Necessary to identify an individual instance
- Architecture: Necessary to identify an individual instance
- Version: Necessary to identify an individual instance
- Instance count: Necessary for grouping instances by the previous four fields

New entries to the table will be generated using a background job scheduled
periodically at a specified interval.
  • Loading branch information
skoeva committed Jul 19, 2023
1 parent 03a5143 commit d7a1836
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 0 deletions.
23 changes: 23 additions & 0 deletions backend/pkg/api/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions backend/pkg/api/db/migrations/0019_add_instance_stats_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +migrate Up

create table if not exists instance_stats (
timestamp timestamptz not null,
channel_name varchar(25) not null,
arch varchar(7) not null,
version varchar(255) not null,
instances int not null check (instances >= 0),
unique(timestamp, channel_name, arch, version)
);

-- +migrate Down

drop table if exists instance_stats;
163 changes: 163 additions & 0 deletions backend/pkg/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/doug-martin/goqu/v9"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (

const (
validityInterval postgresDuration = "1 days"
defaultInterval time.Duration = 2 * time.Hour
)

// Instance represents an instance running one or more applications for which
Expand Down Expand Up @@ -108,6 +110,14 @@ type InstancesQueryParams struct {
SearchValue string `json:"search_value"`
}

type InstanceStats struct {
Timestamp time.Time `db:"timestamp" json:"timestamp"`
ChannelName string `db:"channel_name" json:"channel_name"`
Arch string `db:"arch" json:"arch"`
Version string `db:"version" json:"version"`
Instances int `db:"instances" json:"instances"`
}

type instanceFilterItem int

const (
Expand Down Expand Up @@ -631,3 +641,156 @@ func (api *API) instanceStatusHistoryQuery(instanceID, appID, groupID string, li
Order(goqu.C("created_ts").Desc()).
Limit(uint(limit))
}

// instanceStatsQuery returns a SelectDataset prepared to return all instances
// that have been checked in during a given duration from a given time.
func (api *API) instanceStatsQuery(t *time.Time, duration *time.Duration) *goqu.SelectDataset {
if t == nil {
now := time.Now()
t = &now
}

if duration == nil {
d := defaultInterval
duration = &d
}

// Helper function to convert duration to PostgreSQL interval string
durationToInterval := func(d time.Duration) string {
if d <= 0 {
d = time.Microsecond
}

parts := []string{}

hours := int(d.Hours())
if hours != 0 {
parts = append(parts, fmt.Sprintf("%d hours", hours))
}

remainder := d - time.Duration(hours)*time.Hour
minutes := int(remainder.Minutes())
if minutes != 0 {
parts = append(parts, fmt.Sprintf("%d minutes", minutes))
}

remainder -= time.Duration(minutes) * time.Minute
seconds := int(remainder.Seconds())
if seconds != 0 {
parts = append(parts, fmt.Sprintf("%d seconds", seconds))
}

remainder -= time.Duration(seconds) * time.Second
microseconds := remainder.Microseconds()
if microseconds != 0 {
parts = append(parts, fmt.Sprintf("%d microseconds", microseconds))
}

return strings.Join(parts, " ")
}

interval := durationToInterval(*duration)
timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")))
timestampMinusDuration := goqu.L("timestamp ? - interval ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")), interval)

query := goqu.From(goqu.T("instance_application")).
Select(
timestamp,
goqu.T("channel").Col("name").As("channel_name"),
goqu.Case().
When(goqu.T("channel").Col("arch").Eq(1), "AMD64").
When(goqu.T("channel").Col("arch").Eq(2), "ARM").
Else("").
As("arch"),
goqu.C("version").As("version"),
goqu.COUNT("*").As("instances")).
Join(goqu.T("groups"), goqu.On(goqu.C("group_id").Eq(goqu.T("groups").Col("id")))).
Join(goqu.T("channel"), goqu.On(goqu.T("groups").Col("channel_id").Eq(goqu.T("channel").Col("id")))).
Where(
goqu.C("last_check_for_updates").Gt(timestampMinusDuration),
goqu.C("last_check_for_updates").Lte(timestamp)).
GroupBy(timestamp,
goqu.T("channel").Col("name"),
goqu.T("channel").Col("arch"),
goqu.C("version")).
Order(timestamp.Asc())

return query
}

// GetInstanceStats returns an InstanceStats table with all instances that have
// been previously been checked in.
func (api *API) GetInstanceStats() ([]InstanceStats, error) {
query, _, err := goqu.From("instance_stats").
Order(goqu.C("timestamp").Asc()).ToSQL()
if err != nil {
return nil, err
}

rows, err := api.db.Queryx(query)
if err != nil {
return nil, err
}
defer rows.Close()

var instances []InstanceStats
for rows.Next() {
var instance InstanceStats
err = rows.StructScan(&instance)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}

return instances, nil
}

// GetInstanceStatsByTimestamp returns an InstanceStats array of instances matching a
// given timestamp value, ordered by version.
func (api *API) GetInstanceStatsByTimestamp(t time.Time) ([]InstanceStats, error) {
timestamp := goqu.L("timestamp ?", goqu.V(t.Format("2006-01-02T15:04:05.999999Z07:00")))

query, _, err := goqu.From("instance_stats").
Where(goqu.C("timestamp").Eq(timestamp)).
Order(goqu.C("version").Asc()).ToSQL()
if err != nil {
return nil, err
}

rows, err := api.db.Queryx(query)
if err != nil {
return nil, err
}
defer rows.Close()

var instances []InstanceStats
for rows.Next() {
var instance InstanceStats
err = rows.StructScan(&instance)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}

return instances, nil
}

// updateInstanceStats updates the instance_stats table with instances checked
// in during a given duration from a given time.
func (api *API) updateInstanceStats(t *time.Time, duration *time.Duration) error {
insertQuery, _, err := goqu.Insert(goqu.T("instance_stats")).
Cols("timestamp", "channel_name", "arch", "version", "instances").
FromQuery(api.instanceStatsQuery(t, duration)).
ToSQL()
if err != nil {
return err
}

_, err = api.db.Exec(insertQuery)
if err != nil {
return err
}
return nil
}
102 changes: 102 additions & 0 deletions backend/pkg/api/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -294,3 +295,104 @@ func TestGetInstanceStatusHistory(t *testing.T) {
assert.Equal(t, history[3].Status, InstanceStatusUpdateGranted)
assert.Equal(t, history[3].Version, "1.0.1")
}

func TestUpdateInstanceStats(t *testing.T) {
a := newForTest(t)
defer a.Close()

instances, err := a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 0, len(instances))

// First test case: Create tInstance1, tInstance2, and tInstance3; check tInstance1 twice; switch tInstance2 version
start := time.Now().UTC()

tTeam, _ := a.AddTeam(&Team{Name: "test_team"})
tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID})
tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID, Arch: ArchAMD64})
tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID), Arch: ArchAMD64})
tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"})
tInstance1, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)
tInstance2, _ := a.RegisterInstance(uuid.New().String(), "", "10.0.0.2", "1.0.0", tApp.ID, tGroup.ID)
_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.3", "1.0.1", tApp.ID, tGroup.ID)

_, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.1", tApp.ID, tGroup.ID)
assert.NoError(t, err)

ts := time.Now().UTC()
elapsed := ts.Sub(start)

err = a.updateInstanceStats(&ts, &elapsed)
assert.NoError(t, err)

instances, err = a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 2, len(instances))

instanceStats, err := a.GetInstanceStatsByTimestamp(ts)
assert.NoError(t, err)
assert.Equal(t, 2, len(instanceStats))
assert.Equal(t, "1.0.0", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
assert.Equal(t, "1.0.1", instanceStats[1].Version)
assert.Equal(t, 2, instanceStats[1].Instances)

// Next test case: Switch tInstance1 and tInstance2 versions to workaround the 5-minutes-rate-limiting of the check-in time and add new instance
ts2 := time.Now().UTC()

_, err = a.GetUpdatePackage(tInstance1.ID, "", "10.0.0.1", "1.0.3", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, err = a.GetUpdatePackage(tInstance2.ID, "", "10.0.0.2", "1.0.4", tApp.ID, tGroup.ID)
assert.NoError(t, err)

_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.4", "1.0.5", tApp.ID, tGroup.ID)

ts3 := time.Now().UTC()
elapsed = ts3.Sub(ts2)

err = a.updateInstanceStats(&ts3, &elapsed)
assert.NoError(t, err)

instances, err = a.GetInstanceStats()
assert.NoError(t, err)
assert.Equal(t, 5, len(instances))

instanceStats, err = a.GetInstanceStatsByTimestamp(ts3)
assert.NoError(t, err)
assert.Equal(t, 3, len(instanceStats))
assert.Equal(t, "1.0.3", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
assert.Equal(t, "1.0.4", instanceStats[1].Version)
assert.Equal(t, 1, instanceStats[1].Instances)
assert.Equal(t, "1.0.5", instanceStats[2].Version)
assert.Equal(t, 1, instanceStats[2].Instances)
}

func TestUpdateInstanceStatsNoArch(t *testing.T) {
a := newForTest(t)
defer a.Close()

tTeam, _ := a.AddTeam(&Team{Name: "test_team"})
tApp, _ := a.AddApp(&Application{Name: "test_app", TeamID: tTeam.ID})
tPkg, _ := a.AddPackage(&Package{Type: PkgTypeOther, URL: "http://sample.url/pkg", Version: "12.1.0", ApplicationID: tApp.ID})
tChannel, _ := a.AddChannel(&Channel{Name: "test_channel", Color: "blue", ApplicationID: tApp.ID, PackageID: null.StringFrom(tPkg.ID)})
tGroup, _ := a.AddGroup(&Group{Name: "group1", ApplicationID: tApp.ID, ChannelID: null.StringFrom(tChannel.ID), PolicyUpdatesEnabled: true, PolicySafeMode: false, PolicyPeriodInterval: "15 minutes", PolicyMaxUpdatesPerPeriod: 2, PolicyUpdateTimeout: "60 minutes"})
_, _ = a.RegisterInstance(uuid.New().String(), "", "10.0.0.1", "1.0.0", tApp.ID, tGroup.ID)

ts := time.Now().UTC()
// Use large duration to have some test coverage for durationToInterval
elapsed := 3*time.Hour + 45*time.Minute + 30*time.Second + 1000*time.Microsecond

err := a.updateInstanceStats(&ts, &elapsed)
assert.NoError(t, err)

instanceStats, err := a.GetInstanceStatsByTimestamp(ts)
assert.NoError(t, err)
assert.Equal(t, "", instanceStats[0].Arch)
assert.Equal(t, "1.0.0", instanceStats[0].Version)
assert.Equal(t, 1, instanceStats[0].Instances)
}

0 comments on commit d7a1836

Please sign in to comment.