Skip to content

Commit

Permalink
ActivityLog Implement HyperLogLog Store Functionality During Precompu…
Browse files Browse the repository at this point in the history
…tation (#16146)

* adding hll for each month

* add changelog

* removing influxdb

* removing influxdb

* removing influxdb

* changing switch to if-else for semgrep
  • Loading branch information
akshya96 authored Jun 27, 2022
1 parent 53bfb72 commit cf60460
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 6 deletions.
3 changes: 3 additions & 0 deletions changelog/16146.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core/activity: generate hyperloglogs containing clientIds for each month during precomputation
```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/armon/go-radix v1.0.0
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
github.com/aws/aws-sdk-go v1.43.4
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a
github.com/cenkalti/backoff/v3 v3.2.2
github.com/chrismalek/oktasdk-go v0.0.0-20181212195951-3430665dfaa0
github.com/client9/misspell v0.3.4
Expand Down Expand Up @@ -255,6 +256,7 @@ require (
github.com/couchbase/gocbcore/v10 v10.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/digitalocean/godo v1.7.5 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/docker/cli v20.10.9+incompatible // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 h1:1Pls85C5CFjhE3aH+h85/hyAk89kQ
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1/go.mod h1:hLZ/AnkIKHLuPGjEiyghNEdvJ2PP0MgOxcmv9EBJ4xs=
github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg=
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a h1:eqjiAL3qooftPm8b9C1GsSSRcmlw7iOva8vdBTmV2PY=
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand Down Expand Up @@ -499,6 +501,8 @@ github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba h1:p6poVbjHDkK
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/digitalocean/godo v1.7.5 h1:JOQbAO6QT1GGjor0doT0mXefX2FgUDPOpYh2RaXA+ko=
github.com/digitalocean/godo v1.7.5/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU=
Expand Down Expand Up @@ -1093,6 +1097,7 @@ github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
Expand Down
82 changes: 76 additions & 6 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"
"unicode/utf8"

"github.com/axiomhq/hyperloglog"
"github.com/golang/protobuf/proto"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/metricsutil"
Expand All @@ -36,6 +37,9 @@ const (
activityConfigKey = "config"
activityIntentLogKey = "endofmonth"

// sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/"

// for testing purposes (public as needed)
ActivityLogPrefix = "sys/counters/activity/log/"
ActivityPrefix = "sys/counters/activity/"
Expand Down Expand Up @@ -363,6 +367,53 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil
}

// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage.
// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients
// In activity log, hyperloglog is a sketch containing clientID's in a given month
func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
hll := hyperloglog.New()
a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath)
data, err := a.view.Get(ctx, monthlyHLLPath)
if err != nil {
a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
if data == nil {
a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath)
err = a.StoreHyperlogLog(ctx, startTime, hll)
if err != nil {
a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
} else {
err = hll.UnmarshalBinary(data.Value)
if err != nil {
a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
}
return hll
}

// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage
func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath)
marshalledHll, err := newHll.MarshalBinary()
if err != nil {
return err
}
err = a.view.Put(ctx, &logical.StorageEntry{
Key: monthlyHLLPath,
Value: marshalledHll,
})
if err != nil {
return err
}
return nil
}

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
Expand Down Expand Up @@ -515,7 +566,7 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime
}

// WalkEntitySegments loads each of the entity segments for a particular start time
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, walkFn func(*activity.EntityActivityLog, time.Time) error) error {
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, hll *hyperloglog.Sketch, walkFn func(*activity.EntityActivityLog, time.Time, *hyperloglog.Sketch) error) error {
basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath)
if err != nil {
Expand All @@ -537,7 +588,7 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Tim
if err != nil {
return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err)
}
err = walkFn(out, startTime)
err = walkFn(out, startTime, hll)
if err != nil {
return fmt.Errorf("unable to walk entities: %w", err)
}
Expand Down Expand Up @@ -2069,10 +2120,21 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
byNamespace := make(map[string]*processByNamespace)
byMonth := make(map[int64]*processMonth)

walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
for _, e := range l.Clients {

processClientRecord(e, byNamespace, byMonth, startTime)

// We maintain an hyperloglog for each month
// hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month
// hyperloglog is used in activity log to get the approximate number new clients in the current billing month
// by counting the number of distinct clients in all the months including current month
// (this can be done by merging the hyperloglog all months with current month hyperloglog)
// and subtracting the number of distinct clients in the current month
// NOTE: current month here is not the month of startTime but the time period from the start of the current month,
// up until the time that this request was made.
hll.Insert([]byte(e.ClientID))

// The byMonth map will be filled in the reverse order of time. For
// example, if the billing period is from Jan to June, the byMonth
// will be filled for June first, May next and so on till Jan. When
Expand Down Expand Up @@ -2144,11 +2206,17 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
break
}

err = a.WalkEntitySegments(ctx, startTime, walkEntities)
hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime)
err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities)
if err != nil {
a.logger.Warn("failed to load previous segments", "error", err)
return err
}
// Store the hyperloglog
err = a.StoreHyperlogLog(ctx, startTime, hyperloglog)
if err != nil {
a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err)
}
err = a.WalkTokenSegments(ctx, startTime, walkTokens)
if err != nil {
a.logger.Warn("failed to load previous token counts", "error", err)
Expand Down Expand Up @@ -2646,7 +2714,8 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
a.logger.Info("starting activity log export", "start_time", startTime, "end_time", endTime, "format", format)

dedupedIds := make(map[string]struct{})
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {

walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
for _, e := range l.Clients {
if _, ok := dedupedIds[e.ClientID]; ok {
continue
Expand All @@ -2663,8 +2732,9 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
}

// For each month in the filtered list walk all the log segments

for _, startTime := range filteredList {
err := a.WalkEntitySegments(ctx, startTime, walkEntities)
err := a.WalkEntitySegments(ctx, startTime, nil, walkEntities)
if err != nil {
a.logger.Error("failed to load segments for export", "error", err)
return fmt.Errorf("failed to load segments for export: %w", err)
Expand Down
29 changes: 29 additions & 0 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/go-test/deep"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/vault/helper/constants"
Expand Down Expand Up @@ -472,6 +473,34 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
expectedEntityIDs(t, out, ids)
}

// Test to check store hyperloglog and fetch hyperloglog from storage
func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
ctx := context.Background()

a := core.activityLog
a.SetStandbyEnable(ctx, true)
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
currentMonth := timeutil.StartOfMonth(time.Now())
currentMonthHll := hyperloglog.New()
currentMonthHll.Insert([]byte("a"))
currentMonthHll.Insert([]byte("a"))
currentMonthHll.Insert([]byte("b"))
currentMonthHll.Insert([]byte("c"))
currentMonthHll.Insert([]byte("d"))
currentMonthHll.Insert([]byte("d"))

err := a.StoreHyperlogLog(ctx, currentMonth, currentMonthHll)
if err != nil {
t.Fatalf("error storing hyperloglog in storage: %v", err)
}
fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth)
// check the distinct count stored from hll
if fetchedHll.Estimate() != 4 {
t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate())
}
}

func TestModifyResponseMonthsNilAppend(t *testing.T) {
end := time.Now().UTC()
start := timeutil.StartOfMonth(end).AddDate(0, -5, 0)
Expand Down

0 comments on commit cf60460

Please sign in to comment.