Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActivityLog Implement HyperLogLog Store Functionality During Precomputation #16146

Merged
merged 6 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/16146.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core/activity: generate hyperloglogs containing clientIds for each month during precomputation
```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.12.0 // indirect
github.com/aws/smithy-go v1.7.0 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
Expand All @@ -253,6 +254,7 @@ require (
github.com/couchbase/gocbcore/v10 v10.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/digitalocean/godo v1.7.5 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/docker/cli v20.10.9+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 h1:1Pls85C5CFjhE3aH+h85/hyAk89kQ
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1/go.mod h1:hLZ/AnkIKHLuPGjEiyghNEdvJ2PP0MgOxcmv9EBJ4xs=
github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg=
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a h1:eqjiAL3qooftPm8b9C1GsSSRcmlw7iOva8vdBTmV2PY=
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
Expand Down Expand Up @@ -500,6 +502,8 @@ github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba h1:p6poVbjHDkK
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/digitalocean/godo v1.7.5 h1:JOQbAO6QT1GGjor0doT0mXefX2FgUDPOpYh2RaXA+ko=
github.com/digitalocean/godo v1.7.5/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU=
Expand Down
84 changes: 78 additions & 6 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"
"unicode/utf8"

"github.com/axiomhq/hyperloglog"
"github.com/golang/protobuf/proto"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/metricsutil"
Expand All @@ -36,6 +37,9 @@ const (
activityConfigKey = "config"
activityIntentLogKey = "endofmonth"

// sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/"

// for testing purposes (public as needed)
ActivityLogPrefix = "sys/counters/activity/log/"
ActivityPrefix = "sys/counters/activity/"
Expand Down Expand Up @@ -363,6 +367,55 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil
}

// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage.
// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients
// In activity log, hyperloglog is a sketch containing clientID's in a given month
func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
hll := hyperloglog.New()
a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath)
data, err := a.view.Get(ctx, monthlyHLLPath)
if err != nil {
a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
switch {
case data == nil: // no data at given path, create new hyperlogLog
a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath)
err = a.StoreHyperlogLog(ctx, startTime, hll)
if err != nil {
a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}

default:
err = hll.UnmarshalBinary(data.Value)
if err != nil {
a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
}
return hll
}

// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage
func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath)
marshalledHll, err := newHll.MarshalBinary()
if err != nil {
return err
}
err = a.view.Put(ctx, &logical.StorageEntry{
Key: monthlyHLLPath,
Value: marshalledHll,
})
if err != nil {
return err
}
return nil
}

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
Expand Down Expand Up @@ -515,7 +568,7 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime
}

// WalkEntitySegments loads each of the entity segments for a particular start time
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, walkFn func(*activity.EntityActivityLog, time.Time) error) error {
func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, hll *hyperloglog.Sketch, walkFn func(*activity.EntityActivityLog, time.Time, *hyperloglog.Sketch) error) error {
basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath)
if err != nil {
Expand All @@ -537,7 +590,7 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Tim
if err != nil {
return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err)
}
err = walkFn(out, startTime)
err = walkFn(out, startTime, hll)
if err != nil {
return fmt.Errorf("unable to walk entities: %w", err)
}
Expand Down Expand Up @@ -2069,10 +2122,21 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
byNamespace := make(map[string]*processByNamespace)
byMonth := make(map[int64]*processMonth)

walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
for _, e := range l.Clients {

processClientRecord(e, byNamespace, byMonth, startTime)

// We maintain an hyperloglog for each month
// hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month
// hyperloglog is used in activity log to get the approximate number new clients in the current billing month
// by counting the number of distinct clients in all the months including current month
// (this can be done by merging the hyperloglog all months with current month hyperloglog)
// and subtracting the number of distinct clients in the current month
// NOTE: current month here is not the month of startTime but the time period from the start of the current month,
// up until the time that this request was made.
hll.Insert([]byte(e.ClientID))

// The byMonth map will be filled in the reverse order of time. For
// example, if the billing period is from Jan to June, the byMonth
// will be filled for June first, May next and so on till Jan. When
Expand Down Expand Up @@ -2144,11 +2208,17 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
break
}

err = a.WalkEntitySegments(ctx, startTime, walkEntities)
hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime)
err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities)
if err != nil {
a.logger.Warn("failed to load previous segments", "error", err)
return err
}
// Store the hyperloglog
err = a.StoreHyperlogLog(ctx, startTime, hyperloglog)
if err != nil {
a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err)
}
err = a.WalkTokenSegments(ctx, startTime, walkTokens)
if err != nil {
a.logger.Warn("failed to load previous token counts", "error", err)
Expand Down Expand Up @@ -2646,7 +2716,8 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
a.logger.Info("starting activity log export", "start_time", startTime, "end_time", endTime, "format", format)

dedupedIds := make(map[string]struct{})
walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error {

walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error {
for _, e := range l.Clients {
if _, ok := dedupedIds[e.ClientID]; ok {
continue
Expand All @@ -2663,8 +2734,9 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
}

// For each month in the filtered list walk all the log segments

for _, startTime := range filteredList {
err := a.WalkEntitySegments(ctx, startTime, walkEntities)
err := a.WalkEntitySegments(ctx, startTime, nil, walkEntities)
if err != nil {
a.logger.Error("failed to load segments for export", "error", err)
return fmt.Errorf("failed to load segments for export: %w", err)
Expand Down
29 changes: 29 additions & 0 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/go-test/deep"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/vault/helper/constants"
Expand Down Expand Up @@ -472,6 +473,34 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
expectedEntityIDs(t, out, ids)
}

// Test to check store hyperloglog and fetch hyperloglog from storage
func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
ctx := context.Background()

a := core.activityLog
a.SetStandbyEnable(ctx, true)
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
currentMonth := timeutil.StartOfMonth(time.Now())
currentMonthHll := hyperloglog.New()
currentMonthHll.Insert([]byte("a"))
currentMonthHll.Insert([]byte("a"))
currentMonthHll.Insert([]byte("b"))
currentMonthHll.Insert([]byte("c"))
currentMonthHll.Insert([]byte("d"))
currentMonthHll.Insert([]byte("d"))

err := a.StoreHyperlogLog(ctx, currentMonth, currentMonthHll)
if err != nil {
t.Fatalf("error storing hyperloglog in storage: %v", err)
}
fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth)
// check the distinct count stored from hll
if fetchedHll.Estimate() != 4 {
t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate())
}
}

func TestModifyResponseMonthsNilAppend(t *testing.T) {
end := time.Now().UTC()
start := timeutil.StartOfMonth(end).AddDate(0, -5, 0)
Expand Down