Skip to content

Commit

Permalink
Vault 936: use core.activeContext in ActivityLog (#13083) (#13099)
Browse files Browse the repository at this point in the history
* update activity log to use core's activeContext for cleaner worker termination

* update tests to use core activeContext instead of generic context

* pass context around instead

* revert context change

* undo test context changes

* change worker context

* accidentally undid context for fcn signature changes
  • Loading branch information
swayne275 authored Nov 9, 2021
1 parent 28c0f8f commit ff185c0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
47 changes: 22 additions & 25 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,10 @@ func (a *ActivityLog) resetCurrentLog() {
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
}

func (a *ActivityLog) deleteLogWorker(startTimestamp int64, whenDone chan struct{}) {
ctx := namespace.RootContext(nil)
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp)

// TODO: handle seal gracefully, if we're still working?
entitySegments, err := a.view.List(ctx, entityPath)
if err != nil {
a.logger.Error("could not list entity paths", "error", err)
Expand Down Expand Up @@ -769,7 +767,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
a.logger.Debug("activity log not enabled, skipping refresh from storage")
if !a.core.perfStandby && timeutil.IsCurrentMonth(mostRecent, now) {
a.logger.Debug("activity log is disabled, cleaning up logs for the current month")
go a.deleteLogWorker(mostRecent.Unix(), make(chan struct{}))
go a.deleteLogWorker(ctx, mostRecent.Unix(), make(chan struct{}))
}

return nil
Expand Down Expand Up @@ -880,7 +878,8 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
if !a.enabled && a.currentSegment.startTimestamp != 0 {
a.logger.Trace("deleting current segment")
a.deleteDone = make(chan struct{})
go a.deleteLogWorker(a.currentSegment.startTimestamp, a.deleteDone)
// this is called from a request under stateLock, so use activeContext
go a.deleteLogWorker(a.core.activeContext, a.currentSegment.startTimestamp, a.deleteDone)
a.resetCurrentLog()
}

Expand Down Expand Up @@ -909,7 +908,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
a.retentionMonths = config.RetentionMonths

// check for segments out of retention period, if it has changed
go a.retentionWorker(time.Now(), a.retentionMonths)
go a.retentionWorker(ctx, time.Now(), a.retentionMonths)
}

// update the enable flag and reset the current log
Expand Down Expand Up @@ -971,18 +970,18 @@ func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
// Lock already held here, can't use .PerfStandby()
// The workers need to know the current segment time.
if c.perfStandby {
go manager.perfStandbyFragmentWorker()
go manager.perfStandbyFragmentWorker(ctx)
} else {
go manager.activeFragmentWorker()
go manager.activeFragmentWorker(ctx)

// Check for any intent log, in the background
go manager.precomputedQueryWorker()
go manager.precomputedQueryWorker(ctx)

// Catch up on garbage collection
// Signal when this is done so that unit tests can proceed.
manager.retentionDone = make(chan struct{})
go func() {
manager.retentionWorker(time.Now(), manager.retentionMonths)
manager.retentionWorker(ctx, time.Now(), manager.retentionMonths)
close(manager.retentionDone)
}()
}
Expand Down Expand Up @@ -1026,7 +1025,7 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {

// perfStandbyFragmentWorker handles scheduling fragments
// to send via RPC; it runs on perf standby nodes only.
func (a *ActivityLog) perfStandbyFragmentWorker() {
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
timer := time.NewTimer(time.Duration(0))
fragmentWaiting := false
// Eat first event, so timer is stopped
Expand All @@ -1038,7 +1037,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {
}

sendFunc := func() {
ctx, cancel := context.WithTimeout(context.Background(), activityFragmentSendTimeout)
ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout)
defer cancel()
err := a.sendCurrentFragment(ctx)
if err != nil {
Expand Down Expand Up @@ -1111,7 +1110,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {

// activeFragmentWorker handles scheduling the write of the next
// segment. It runs on active nodes only.
func (a *ActivityLog) activeFragmentWorker() {
func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
ticker := time.NewTicker(activitySegmentInterval)

endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
Expand All @@ -1120,7 +1119,7 @@ func (a *ActivityLog) activeFragmentWorker() {
}

writeFunc := func() {
ctx, cancel := context.WithTimeout(context.Background(), activitySegmentWriteTimeout)
ctx, cancel := context.WithTimeout(ctx, activitySegmentWriteTimeout)
defer cancel()
err := a.saveCurrentSegmentToStorage(ctx, false)
if err != nil {
Expand Down Expand Up @@ -1158,15 +1157,15 @@ func (a *ActivityLog) activeFragmentWorker() {
// Simpler, but ticker.Reset was introduced in go 1.15:
// ticker.Reset(activitySegmentInterval)
case currentTime := <-endOfMonth.C:
err := a.HandleEndOfMonth(currentTime.UTC())
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
if err != nil {
a.logger.Error("failed to perform end of month rotation", "error", err)
}

// Garbage collect any segments or queries based on the immediate
// value of retentionMonths.
a.l.RLock()
go a.retentionWorker(currentTime.UTC(), a.retentionMonths)
go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths)
a.l.RUnlock()

delta := a.StartOfNextMonth().Sub(time.Now())
Expand All @@ -1187,9 +1186,7 @@ type ActivityIntentLog struct {

// Handle rotation to end-of-month
// currentTime is an argument for unit-testing purposes
func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
ctx := namespace.RootContext(nil)

func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Time) error {
// Hold lock to prevent segment or enable changing,
// disable will apply to *next* month.
a.l.Lock()
Expand Down Expand Up @@ -1245,7 +1242,7 @@ func (a *ActivityLog) HandleEndOfMonth(currentTime time.Time) error {
a.fragmentLock.Unlock()

// Work on precomputed queries in background
go a.precomputedQueryWorker()
go a.precomputedQueryWorker(ctx)

return nil
}
Expand Down Expand Up @@ -1522,8 +1519,8 @@ func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string
// goroutine to process the request in the intent log, creating precomputed queries.
// We expect the return value won't be checked, so log errors as they occur
// (but for unit testing having the error return should help.)
func (a *ActivityLog) precomputedQueryWorker() error {
ctx, cancel := context.WithCancel(namespace.RootContext(nil))
func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Cancel the context if activity log is shut down.
Expand Down Expand Up @@ -1700,8 +1697,8 @@ func (a *ActivityLog) precomputedQueryWorker() error {
// the retention period.
// We expect the return value won't be checked, so log errors as they occur
// (but for unit testing having the error return should help.)
func (a *ActivityLog) retentionWorker(currentTime time.Time, retentionMonths int) error {
ctx, cancel := context.WithCancel(namespace.RootContext(nil))
func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time, retentionMonths int) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Cancel the context if activity log is shut down.
Expand All @@ -1727,7 +1724,7 @@ func (a *ActivityLog) retentionWorker(currentTime time.Time, retentionMonths int
// One at a time seems OK
if t.Before(retentionThreshold) {
a.logger.Trace("deleting segments", "startTime", t)
a.deleteLogWorker(t.Unix(), make(chan struct{}))
a.deleteLogWorker(ctx, t.Unix(), make(chan struct{}))
}
}

Expand Down
20 changes: 11 additions & 9 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
doneCh := make(chan struct{})
timeout := time.After(20 * time.Second)

go a.deleteLogWorker(1111, doneCh)
go a.deleteLogWorker(namespace.RootContext(nil), 1111, doneCh)
select {
case <-doneCh:
break
Expand Down Expand Up @@ -1774,7 +1774,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
month2 := timeutil.StartOfNextMonth(month1)

// Trigger end-of-month
a.HandleEndOfMonth(month1)
a.HandleEndOfMonth(ctx, month1)

// Check segment is present, with 1 entity
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0)
Expand Down Expand Up @@ -1812,7 +1812,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {

a.AddEntityToFragment(id2, "root", time.Now().Unix())

a.HandleEndOfMonth(month2)
a.HandleEndOfMonth(ctx, month2)
segment2 := a.GetStartTimestamp()

a.AddEntityToFragment(id3, "root", time.Now().Unix())
Expand Down Expand Up @@ -2127,7 +2127,7 @@ func TestActivityLog_Precompute(t *testing.T) {
// Pretend we've successfully rolled over to the following month
a.SetStartTimestamp(tc.NextMonth)

err = a.precomputedQueryWorker()
err = a.precomputedQueryWorker(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2253,7 +2253,8 @@ func TestActivityLog_PrecomputeCancel(t *testing.T) {

// This will block if the shutdown didn't work.
go func() {
a.precomputedQueryWorker()
// We expect this to error because of BlockingInmemStorage
_ = a.precomputedQueryWorker(namespace.RootContext(nil))
close(done)
}()

Expand Down Expand Up @@ -2391,9 +2392,10 @@ func TestActivityLog_Deletion(t *testing.T) {
}
}

ctx := namespace.RootContext(nil)
t.Log("24 months")
now := times[len(times)-1]
err := a.retentionWorker(now, 24)
err := a.retentionWorker(ctx, now, 24)
if err != nil {
t.Fatal(err)
}
Expand All @@ -2402,7 +2404,7 @@ func TestActivityLog_Deletion(t *testing.T) {
}

t.Log("12 months")
err = a.retentionWorker(now, 12)
err = a.retentionWorker(ctx, now, 12)
if err != nil {
t.Fatal(err)
}
Expand All @@ -2414,7 +2416,7 @@ func TestActivityLog_Deletion(t *testing.T) {
}

t.Log("1 month")
err = a.retentionWorker(now, 1)
err = a.retentionWorker(ctx, now, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -2425,7 +2427,7 @@ func TestActivityLog_Deletion(t *testing.T) {
checkPresent(21)

t.Log("0 months")
err = a.retentionWorker(now, 0)
err = a.retentionWorker(ctx, now, 0)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit ff185c0

Please sign in to comment.