diff --git a/vault/activity_log.go b/vault/activity_log.go index 1a9b23f4038d..757165f3e1f1 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -46,9 +46,7 @@ const ( activityGlobalPathPrefix = "global/" activityLocalPathPrefix = "local/" - activityACMERegenerationKey = "acme-regeneration" - activityDeduplicationUpgradeKey = "deduplication-upgrade" - + activityACMERegenerationKey = "acme-regeneration" // sketch for each month that stores hash of client ids distinctClientsBasePath = "log/distinctclients/" @@ -116,8 +114,6 @@ const ( // CSV encoder. Indexes will be generated to ensure that values are slotted into the // correct column. This initial value is used prior to finalizing the CSV header. exportCSVFlatteningInitIndex = -1 - - DeduplicatedClientMinimumVersion = "1.19.0" ) var ( @@ -200,11 +196,6 @@ type ActivityLog struct { // Channel to stop background processing doneCh chan struct{} - // Channel to signal global clients have received by the primary from the secondary, during upgrade to 1.19 - dedupUpgradeGlobalClientsReceivedCh chan struct{} - // track whether the current cluster is in the middle of an upgrade to 1.19 - dedupClientsUpgradeComplete *atomic.Bool - // track metadata and contents of the most recent log segment currentSegment segmentInfo @@ -233,18 +224,8 @@ type ActivityLog struct { // channel closed when deletion at startup is done // (for unit test robustness) - retentionDone chan struct{} - // This channel is relevant for upgrades to 1.17. It indicates whether precomputed queries have been - // generated for ACME clients. + retentionDone chan struct{} computationWorkerDone chan struct{} - // This channel is relevant for upgrades to 1.19+ (version with deduplication of clients) - // This indicates that paths that were used before 1.19 to store clients have been cleaned - oldStoragePathsCleaned chan struct{} - - // channel to indicate that a global clients have been - // sent to the primary from a secondary - globalClientsSent chan struct{} - clientsReceivedForMigration map[int64][]*activity.LogFragment // for testing: is config currently being invalidated. protected by l configInvalidationInProgress bool @@ -369,21 +350,19 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me clock = timeutil.DefaultClock{} } a := &ActivityLog{ - core: core, - configOverrides: &core.activityLogConfig, - logger: logger, - view: view, - metrics: metrics, - nodeID: hostname, - newFragmentCh: make(chan struct{}, 1), - sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size - doneCh: make(chan struct{}, 1), - partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord), - newGlobalClientFragmentCh: make(chan struct{}, 1), - dedupUpgradeGlobalClientsReceivedCh: make(chan struct{}, 1), - clientsReceivedForMigration: make(map[int64][]*activity.LogFragment), - globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord), - clock: clock, + core: core, + configOverrides: &core.activityLogConfig, + logger: logger, + view: view, + metrics: metrics, + nodeID: hostname, + newFragmentCh: make(chan struct{}, 1), + sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size + doneCh: make(chan struct{}, 1), + partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord), + newGlobalClientFragmentCh: make(chan struct{}, 1), + globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord), + clock: clock, currentSegment: segmentInfo{ startTimestamp: 0, currentClients: &activity.EntityActivityLog{ @@ -428,7 +407,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), inprocessExport: atomic.NewBool(false), precomputedQueryWritten: make(chan struct{}), - dedupClientsUpgradeComplete: atomic.NewBool(false), } config, err := a.loadConfigOrDefault(core.activeContext) @@ -481,8 +459,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for a.currentGlobalFragment = nil a.globalFragmentLock.Unlock() - globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...) - if !a.core.IsPerfSecondary() { if a.currentGlobalFragment != nil { a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_fragment_size"}, @@ -491,24 +467,19 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for {"type", "client"}, }) } + var globalReceivedFragmentTotal int + for _, globalReceivedFragment := range secondaryGlobalClients { + globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) + } + for _, globalReceivedFragment := range standbyGlobalClients { + globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) + } a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"}, - float32(len(globalFragments)), + float32(globalReceivedFragmentTotal), []metricsutil.Label{ {"type", "client"}, }) - // Since we are the primary, store global clients - // Create fragments from global clients and store the segment - if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { - return ret - } - - } else if !a.dedupClientsUpgradeComplete.Load() { - // We are the secondary, and an upgrade is in progress. In this case we will temporarily store the data at this old path - // This data will be garbage collected after the upgrade has completed - if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentSegment, force, ""); ret != nil { - return ret - } } // If segment start time is zero, do not update or write @@ -518,6 +489,15 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } + // If we are the primary, store global clients + // Create fragments from global clients and store the segment + if !a.core.IsPerfSecondary() { + globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...) + if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { + return ret + } + } + // Swap out the pending local fragments a.localFragmentLock.Lock() localFragment := a.localFragment @@ -635,74 +615,6 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra return nil } -func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error { - tokenByNamespace := make(map[string]uint64) - for _, fragment := range fragments { - // As of 1.9, a fragment should no longer have any NonEntityTokens. However - // in order to not lose any information about the current segment during the - // month when the client upgrades to 1.9, we must retain this functionality. - for ns, val := range fragment.NonEntityTokens { - // We track these pre-1.9 values in the old location, which is - // a.currentSegment.tokenCount, as opposed to the counter that stores tokens - // without entities that have client IDs, namely - // a.partialMonthClientTracker.nonEntityCountByNamespaceID. This preserves backward - // compatibility for the precomputedQueryWorkers and the segment storing - // logic. - tokenByNamespace[ns] += val - } - } - segmentToStore := segmentInfo{ - startTimestamp: startTime, - clientSequenceNumber: 0, - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - tokenCount: &activity.TokenCount{CountByNamespaceID: tokenByNamespace}, - } - - if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil { - return err - } - return nil -} - -func (a *ActivityLog) savePreviousEntitySegments(ctx context.Context, startTime int64, pathPrefix string, allFragments []*activity.LogFragment) error { - deduplicatedClients := make(map[string]*activity.EntityRecord) - for _, f := range allFragments { - for _, entity := range f.GetClients() { - deduplicatedClients[entity.ClientID] = entity - } - } - - segmentToStore := segmentInfo{ - startTimestamp: startTime, - clientSequenceNumber: 0, - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - } - incrementSegmentNum := func() { - segmentToStore.clientSequenceNumber = segmentToStore.clientSequenceNumber + 1 - segmentToStore.currentClients.Clients = make([]*activity.EntityRecord, 0) - } - numAddedClients := 0 - for _, entity := range deduplicatedClients { - segmentToStore.currentClients.Clients = append(segmentToStore.currentClients.Clients, entity) - numAddedClients++ - if numAddedClients%ActivitySegmentClientCapacity == 0 { - if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil { - return err - } - incrementSegmentNum() - } - } - // Store any remaining clients if they exist - if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil { - return err - } - return nil -} - // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool, currentSegment segmentInfo, storagePathPrefix string) error { _, err := a.saveSegmentEntitiesInternal(ctx, currentSegment, force, storagePathPrefix) @@ -799,30 +711,28 @@ func parseSegmentNumberFromPath(path string) (int, bool) { // availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist, // sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { - pathSet := make(map[time.Time]struct{}) - out := make([]time.Time, 0) - availableTimes := make([]time.Time, 0) - - times, err := a.availableTimesAtPath(ctx, upTo, activityTokenLocalBasePath) - if err != nil { - return nil, err - } - availableTimes = append(availableTimes, times...) + paths := make([]string, 0) + for _, basePath := range []string{activityLocalPathPrefix + activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} { + p, err := a.view.List(ctx, basePath) + if err != nil { + return nil, err + } - times, err = a.availableTimesAtPath(ctx, upTo, activityGlobalPathPrefix+activityEntityBasePath) - if err != nil { - return nil, err + paths = append(paths, p...) } - availableTimes = append(availableTimes, times...) - times, err = a.availableTimesAtPath(ctx, upTo, activityLocalPathPrefix+activityEntityBasePath) - if err != nil { - return nil, err - } - availableTimes = append(availableTimes, times...) + pathSet := make(map[time.Time]struct{}) + out := make([]time.Time, 0) + for _, path := range paths { + // generate a set of unique start times + segmentTime, err := timeutil.ParseTimeFromPath(path) + if err != nil { + return nil, err + } + if segmentTime.After(upTo) { + continue + } - // Remove duplicate start times - for _, segmentTime := range availableTimes { if _, present := pathSet[segmentTime]; !present { pathSet[segmentTime] = struct{}{} out = append(out, segmentTime) @@ -839,27 +749,6 @@ func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time return out, nil } -// availableTimesAtPath returns a sorted list of all available times at the pathPrefix up until the provided time. -func (a *ActivityLog) availableTimesAtPath(ctx context.Context, onlyIncludeTimesUpTo time.Time, path string) ([]time.Time, error) { - paths, err := a.view.List(ctx, path) - if err != nil { - return nil, err - } - out := make([]time.Time, 0) - for _, path := range paths { - // generate a set of unique start times - segmentTime, err := timeutil.ParseTimeFromPath(path) - if err != nil { - return nil, err - } - if segmentTime.After(onlyIncludeTimesUpTo) { - continue - } - out = append(out, segmentTime) - } - return out, nil -} - // getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent // contiguous set of activity logs, sorted in decreasing order (latest to earliest) func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now time.Time) ([]time.Time, error) { @@ -988,42 +877,54 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time // load all the active global clients if !isLocal { globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) - out, err := a.readEntitySegmentAtPath(ctx, globalPath) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + data, err := a.view.Get(ctx, globalPath) + if err != nil { return err } - if out != nil { - a.globalFragmentLock.Lock() - // Handle the (unlikely) case where the end of the month has been reached while background loading. - // Or the feature has been disabled. - if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp { - for _, ent := range out.Clients { - a.globalPartialMonthClientTracker[ent.ClientID] = ent - } - } - a.globalFragmentLock.Unlock() + if data == nil { + return nil } - - } else { - // load all the active local clients - localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) - out, err := a.readEntitySegmentAtPath(ctx, localPath) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + out := &activity.EntityActivityLog{} + err = proto.Unmarshal(data.Value, out) + if err != nil { return err } - if out != nil { - a.localFragmentLock.Lock() - // Handle the (unlikely) case where the end of the month has been reached while background loading. - // Or the feature has been disabled. - if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp { - for _, ent := range out.Clients { - a.partialMonthLocalClientTracker[ent.ClientID] = ent - } + a.globalFragmentLock.Lock() + // Handle the (unlikely) case where the end of the month has been reached while background loading. + // Or the feature has been disabled. + if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp { + for _, ent := range out.Clients { + a.globalPartialMonthClientTracker[ent.ClientID] = ent } - a.localFragmentLock.Unlock() } + a.globalFragmentLock.Unlock() + return nil + } + // load all the active local clients + localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) + data, err := a.view.Get(ctx, localPath) + if err != nil { + return err } + if data == nil { + return nil + } + out := &activity.EntityActivityLog{} + err = proto.Unmarshal(data.Value, out) + if err != nil { + return err + } + a.localFragmentLock.Lock() + // Handle the (unlikely) case where the end of the month has been reached while background loading. + // Or the feature has been disabled. + if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp { + for _, ent := range out.Clients { + a.partialMonthLocalClientTracker[ent.ClientID] = ent + } + } + a.localFragmentLock.Unlock() + return nil } @@ -1031,17 +932,23 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time // into memory (to append new entries), and to the globalPartialMonthClientTracker and partialMonthLocalClientTracker to // avoid duplication call with fragmentLock, globalFragmentLock, localFragmentLock and l held. func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, localSegmentSequenceNumber uint64, globalSegmentSequenceNumber uint64) error { - // setting a.currentSegment timestamp to support upgrades - a.currentSegment.startTimestamp = startTime.Unix() - // load current global segment path := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) - out, err := a.readEntitySegmentAtPath(ctx, path) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + // setting a.currentSegment timestamp to support upgrades + a.currentSegment.startTimestamp = startTime.Unix() + + data, err := a.view.Get(ctx, path) + if err != nil { return err } - if out != nil { + if data != nil { + out := &activity.EntityActivityLog{} + err = proto.Unmarshal(data.Value, out) + if err != nil { + return err + } + if !a.core.perfStandby { a.currentGlobalSegment = segmentInfo{ startTimestamp: startTime.Unix(), @@ -1064,11 +971,17 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti // load current local segment path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10) - out, err = a.readEntitySegmentAtPath(ctx, path) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + data, err = a.view.Get(ctx, path) + if err != nil { return err } - if out != nil { + if data != nil { + out := &activity.EntityActivityLog{} + err = proto.Unmarshal(data.Value, out) + if err != nil { + return err + } + if !a.core.perfStandby { a.currentLocalSegment = segmentInfo{ startTimestamp: startTime.Unix(), @@ -1085,41 +998,10 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti for _, client := range out.Clients { a.partialMonthLocalClientTracker[client.ClientID] = client } - } - - return nil -} -func (a *ActivityLog) readEntitySegmentAtPath(ctx context.Context, path string) (*activity.EntityActivityLog, error) { - data, err := a.view.Get(ctx, path) - if err != nil { - return nil, err - } - if data == nil { - return nil, ErrEmptyResponse } - out := &activity.EntityActivityLog{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return nil, err - } - return out, nil -} -func (a *ActivityLog) readTokenSegmentAtPath(ctx context.Context, path string) (*activity.TokenCount, error) { - data, err := a.view.Get(ctx, path) - if err != nil { - return nil, err - } - if data == nil { - return nil, ErrEmptyResponse - } - out := &activity.TokenCount{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return nil, err - } - return out, nil + return nil } // tokenCountExists checks if there's a token log for :startTime: @@ -1288,26 +1170,6 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, close(whenDone) } -func (a *ActivityLog) deleteOldStoragePathWorker(ctx context.Context, pathPrefix string) { - pathTimes, err := a.view.List(ctx, pathPrefix) - if err != nil { - a.logger.Error("could not list segment paths", "error", err) - return - } - for _, pathTime := range pathTimes { - segments, err := a.view.List(ctx, pathPrefix+pathTime) - if err != nil { - a.logger.Error("could not list segment path", "error", err) - } - for _, seqNum := range segments { - err = a.view.Delete(ctx, pathPrefix+pathTime+seqNum) - if err != nil { - a.logger.Error("could not delete log", "error", err) - } - } - } -} - func (a *ActivityLog) WaitForDeletion() { a.l.Lock() // May be nil, if never set @@ -1646,76 +1508,9 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup, r manager.retentionWorker(ctx, manager.clock.Now(), months) close(manager.retentionDone) }(manager.retentionMonths) - - // We do not want to hold up unseal, and we need access to - // the replicationRpcClient in order for the secondary to migrate data. - // This client is only reliable preset after unseal. - c.postUnsealFuncs = append(c.postUnsealFuncs, func() { - c.activityLogMigrationTask(ctx) - }) - - } - return nil -} - -// secondaryDuplicateClientMigrationWorker will attempt to send global data living on the -// current cluster to the primary cluster. This routine will only exit when its connected primary -// has reached version 1.19+, and this cluster has completed sending any global data that lives at the old storage paths -func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) { - manager := c.activityLog - manager.logger.Trace("started secondary activity log migration worker") - storageMigrationComplete := atomic.NewBool(false) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - if !c.IsPerfSecondary() { - // TODO: Create function for the secondary to continuously attempt to send data to the primary - } - - wg.Done() - }() - wg.Add(1) - go func() { - localClients, _, err := manager.extractLocalGlobalClientsDeprecatedStoragePath(ctx) - if err != nil { - return - } - // Store local clients at new path - for month, entitiesForMonth := range localClients { - logFragments := []*activity.LogFragment{{ - Clients: entitiesForMonth, - }} - if err = manager.savePreviousEntitySegments(ctx, month, activityLocalPathPrefix, logFragments); err != nil { - manager.logger.Error("failed to write local segment", "error", err, "month", month) - return - } - } - storageMigrationComplete.Store(true) - // TODO: generate/store PCQs for these local clients - wg.Done() - }() - wg.Wait() - if !storageMigrationComplete.Load() { - manager.logger.Error("could not complete migration of duplicate clients on cluster") - return } - // We have completed the vital portions of the storage migration - if err := manager.writeDedupClientsUpgrade(ctx); err != nil { - manager.logger.Error("could not complete migration of duplicate clients on cluster") - return - } - - // Now that all the clients have been migrated and PCQs have been created, remove all clients at old storage paths - manager.oldStoragePathsCleaned = make(chan struct{}) - go func() { - defer close(manager.oldStoragePathsCleaned) - manager.deleteOldStoragePathWorker(ctx, activityEntityBasePath) - manager.deleteOldStoragePathWorker(ctx, activityTokenBasePath) - // TODO: Delete old PCQs - }() - manager.dedupClientsUpgradeComplete.Store(true) - manager.logger.Trace("completed secondary activity log migration worker") + return nil } func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool { @@ -1727,15 +1522,6 @@ func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool { return regenerated != nil } -func (a *ActivityLog) hasDedupClientsUpgrade(ctx context.Context) bool { - regenerated, err := a.view.Get(ctx, activityDeduplicationUpgradeKey) - if err != nil { - a.logger.Error("unable to access deduplication regeneration key") - return false - } - return regenerated != nil -} - func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error { regeneratedEntry, err := logical.StorageEntryJSON(activityACMERegenerationKey, true) if err != nil { @@ -1744,14 +1530,6 @@ func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error { return a.view.Put(ctx, regeneratedEntry) } -func (a *ActivityLog) writeDedupClientsUpgrade(ctx context.Context) error { - regeneratedEntry, err := logical.StorageEntryJSON(activityDeduplicationUpgradeKey, true) - if err != nil { - return err - } - return a.view.Put(ctx, regeneratedEntry) -} - func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -1918,12 +1696,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) { <-timer.C } } - // Only send data if no upgrade is in progress. Else, the active worker will - // store the data in a temporary location until it is garbage collected - if a.dedupClientsUpgradeComplete.Load() { - sendFunc() - } - + sendFunc() case <-endOfMonth.C: a.logger.Trace("sending global fragment on end of month") // Flush the current fragment, if any @@ -1933,16 +1706,13 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) { <-timer.C } } - // If an upgrade is in progress, don't do anything - // The active fragmentWorker will take care of flushing the clients to a temporary location - if a.dedupClientsUpgradeComplete.Load() { - sendFunc() - // clear active entity set - a.globalFragmentLock.Lock() - a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) - - a.globalFragmentLock.Unlock() - } + sendFunc() + + // clear active entity set + a.globalFragmentLock.Lock() + a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) + + a.globalFragmentLock.Unlock() // Set timer for next month. // The current segment *probably* hasn't been set yet (via invalidation), @@ -4028,110 +3798,6 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f return nil } -func (c *Core) activityLogMigrationTask(ctx context.Context) { - manager := c.activityLog - if !c.IsPerfSecondary() { - // If the oldest version is less than 1.19 and no migrations tasks have been run, kick off the migration task - if !manager.OldestVersionHasDeduplicatedClients(ctx) && !manager.hasDedupClientsUpgrade(ctx) { - go c.primaryDuplicateClientMigrationWorker(ctx) - } else { - // Store that upgrade processes have already been completed - manager.writeDedupClientsUpgrade(ctx) - manager.dedupClientsUpgradeComplete.Store(true) - } - } else { - // We kick off the secondary migration worker in any chance that the primary has not yet upgraded. - // If we have already completed the migration task, it indicates that the cluster has completed sending data to an - // already upgraded primary - if !manager.hasDedupClientsUpgrade(ctx) { - go c.secondaryDuplicateClientMigrationWorker(ctx) - } else { - // Store that upgrade processes have already been completed - manager.writeDedupClientsUpgrade(ctx) - manager.dedupClientsUpgradeComplete.Store(true) - - } - } -} - -// primaryDuplicateClientMigrationWorker will attempt to receive global data living on the -// connected secondary clusters. Once the data has been received, it will combine it with -// its own global data at old storage paths, and migrate all of it to new storage paths on the -// current cluster. This method wil only exit once all connected secondary clusters have -// upgraded to 1.19, and this cluster receives global data from all of them. -func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error { - a := c.activityLog - a.logger.Trace("started primary activity log migration worker") - - // Collect global clients from secondary - err := a.waitForSecondaryGlobalClients(ctx) - if err != nil { - return err - } - - // Get local and global entities from previous months - clusterLocalClients, clusterGlobalClients, err := a.extractLocalGlobalClientsDeprecatedStoragePath(ctx) - if err != nil { - a.logger.Error("could not extract local and global clients from storage", "error", err) - return err - } - // Get tokens from previous months at old storage paths - clusterTokens, err := a.extractTokensDeprecatedStoragePath(ctx) - - // TODO: Collect clients from secondaries into slice of fragments - - // Store global clients at new path - for month, entitiesForMonth := range clusterGlobalClients { - logFragments := []*activity.LogFragment{{ - Clients: entitiesForMonth, - }} - if err = a.savePreviousEntitySegments(ctx, month, activityGlobalPathPrefix, logFragments); err != nil { - a.logger.Error("failed to write global segment", "error", err, "month", month) - return err - } - } - // Store local clients at new path - for month, entitiesForMonth := range clusterLocalClients { - logFragments := []*activity.LogFragment{{ - Clients: entitiesForMonth, - }} - if err = a.savePreviousEntitySegments(ctx, month, activityLocalPathPrefix, logFragments); err != nil { - a.logger.Error("failed to write local segment", "error", err, "month", month) - return err - } - } - // Store tokens at new path - for month, tokenCount := range clusterTokens { - // Combine all token counts from all clusters - logFragments := make([]*activity.LogFragment, len(tokenCount)) - for i, tokens := range tokenCount { - logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens} - } - if err = a.savePreviousTokenSegments(ctx, month, activityLocalPathPrefix+activityTokenBasePath, logFragments); err != nil { - a.logger.Error("failed to write token segment", "error", err, "month", month) - return err - } - } - - // TODO: After data has been migrated to new locations, we will regenerate all the global and local PCQs - - if err := a.writeDedupClientsUpgrade(ctx); err != nil { - a.logger.Error("could not complete migration of duplicate clients on cluster") - return err - } - // Garbage collect data at old paths - a.oldStoragePathsCleaned = make(chan struct{}) - go func() { - defer close(a.oldStoragePathsCleaned) - a.deleteOldStoragePathWorker(ctx, activityEntityBasePath) - a.deleteOldStoragePathWorker(ctx, activityTokenBasePath) - // We will also need to delete old PCQs - }() - a.dedupClientsUpgradeComplete.Store(true) - a.logger.Trace("completed primary activity log migration worker") - return nil -} - type encoder interface { Encode(*ActivityLogExportRecord) error Flush() diff --git a/vault/activity_log_stubs_oss.go b/vault/activity_log_stubs_oss.go index e7115d41e475..7d2457360563 100644 --- a/vault/activity_log_stubs_oss.go +++ b/vault/activity_log_stubs_oss.go @@ -5,22 +5,11 @@ package vault -import ( - "context" - "errors" -) +import "context" //go:generate go run github.com/hashicorp/vault/tools/stubmaker -// ErrEmptyResponse error is used to avoid returning "nil, nil" from a function -var ErrEmptyResponse = errors.New("empty response; the system encountered a statement that exclusively returns nil values") - // sendGlobalClients is a no-op on CE func (a *ActivityLog) sendGlobalClients(ctx context.Context) error { return nil } - -// waitForSecondaryGlobalClients is a no-op on CE -func (a *ActivityLog) waitForSecondaryGlobalClients(ctx context.Context) error { - return nil -} diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 8599592d8007..1f36a7856582 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -5821,280 +5821,3 @@ func TestCreateSegment_StoreSegment(t *testing.T) { }) } } - -// TestActivityLog_PrimaryDuplicateClientMigrationWorker verifies that the primary -// migration worker correctly moves data from old location to the new location -func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { - cluster := NewTestCluster(t, nil, nil) - core := cluster.Cores[0].Core - a := core.activityLog - a.SetEnable(true) - - ctx := context.Background() - timeStamp := time.Now() - startOfMonth := timeutil.StartOfMonth(timeStamp) - oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp) - twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo) - - clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsGlobal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - NonEntity: false, - } - } - clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsLocal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - // This is to trick the system into believing this a local client when parsing data - ClientType: nonEntityTokenActivityType, - } - } - - tokenCounts := map[string]uint64{ - "ns1": 10, - "ns2": 11, - "ns3": 12, - } - - // Write global and local clients to old path - a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}}) - a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}}) - a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}}) - - // Assert that the migration workers have not been run - require.True(t, a.hasDedupClientsUpgrade(ctx)) - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Resetting this to false so that we can - // verify that after the migrations is completed, the correct values have been stored - a.dedupClientsUpgradeComplete.Store(false) - require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey)) - - // Forcefully run the primary migration worker - core.primaryDuplicateClientMigrationWorker(ctx) - - // Verify that we have the correct number of global clients at the new storage paths - times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth} - for index, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - globalClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadGlobalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - globalClients = append(globalClients, segment.GetClients()...) - } - require.Equal(t, len(clientRecordsGlobal)-index, len(globalClients)) - } - - // Verify local clients - for index, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - localClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadLocalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - localClients = append(localClients, segment.GetClients()...) - } - require.Equal(t, len(clientRecordsLocal)-index, len(localClients)) - } - - // Verify non-entity tokens have been correctly migrated - for _, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - for { - segment, err := reader.ReadToken(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - // Verify that the data is correct - deep.Equal(segment.GetCountByNamespaceID(), tokenCounts) - } - } - - // Check that the storage key has been updated - require.True(t, a.hasDedupClientsUpgrade(ctx)) - // Check that the bool has been updated - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Wait for the deletion of old logs to complete - timeout := time.After(25 * time.Second) - // Wait for channel indicating deletion to be written - select { - case <-timeout: - t.Fatal("timed out waiting for deletion to complete") - case <-a.oldStoragePathsCleaned: - break - } - - // Verify there is no data at the old paths - times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) - - // Verify there is no data at the old token paths - times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) -} - -// TestActivityLog_SecondaryDuplicateClientMigrationWorker verifies that the secondary -// migration worker correctly moves local data from old location to the new location -func TestActivityLog_SecondaryDuplicateClientMigrationWorker(t *testing.T) { - cluster := NewTestCluster(t, nil, nil) - core := cluster.Cores[0].Core - a := core.activityLog - a.SetEnable(true) - - ctx := context.Background() - timeStamp := time.Now() - startOfMonth := timeutil.StartOfMonth(timeStamp) - oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp) - twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo) - - clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsGlobal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - NonEntity: false, - } - } - clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsLocal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - // This is to trick the system into believing this a local client when parsing data - ClientType: nonEntityTokenActivityType, - } - } - - tokenCounts := map[string]uint64{ - "ns1": 10, - "ns2": 11, - "ns3": 12, - } - - // Write global and local clients to old path - a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}}) - a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}}) - a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}}) - - // Write tokens to old path - a.savePreviousTokenSegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - a.savePreviousTokenSegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - a.savePreviousTokenSegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - - // Assert that the migration workers have not been run - require.True(t, a.hasDedupClientsUpgrade(ctx)) - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Resetting this to false so that we can - // verify that after the migrations is completed, the correct values have been stored - a.dedupClientsUpgradeComplete.Store(false) - require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey)) - - // Forcefully run the secondary migration worker - core.secondaryDuplicateClientMigrationWorker(ctx) - - // Wait for the storage migration to complete - ticker := time.NewTicker(100 * time.Millisecond) - timeout := time.After(25 * time.Second) - for { - select { - case <-timeout: - t.Fatal("timed out waiting for migration to complete") - case <-ticker.C: - } - if a.dedupClientsUpgradeComplete.Load() { - break - } - } - - // Verify that no global clients have been migrated - times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth} - for _, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - globalClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadGlobalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - globalClients = append(globalClients, segment.GetClients()...) - } - require.Equal(t, 0, len(globalClients)) - } - - // Verify local clients have been correctly migrated - for index, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - localClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadLocalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - localClients = append(localClients, segment.GetClients()...) - } - require.Equal(t, len(clientRecordsLocal)-index, len(localClients)) - } - - // Verify non-entity tokens have been correctly migrated - for _, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - for { - segment, err := reader.ReadToken(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - // Verify that the data is correct - deep.Equal(segment.GetCountByNamespaceID(), tokenCounts) - } - } - - // Check that the storage key has been updated - require.True(t, a.hasDedupClientsUpgrade(ctx)) - // Check that the bool has been updated - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Wait for the deletion of old logs to complete - timeout = time.After(25 * time.Second) - // Wait for channel indicating deletion to be written - select { - case <-timeout: - t.Fatal("timed out waiting for deletion to complete") - case <-a.oldStoragePathsCleaned: - break - } - - // Verify there is no data at the old entity paths - times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) - - // Verify there is no data at the old token paths - times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) -} diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index 86c824adebab..f3cd616ed99a 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -14,7 +14,6 @@ import ( "time" "github.com/axiomhq/hyperloglog" - semver "github.com/hashicorp/go-version" "github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/activity" @@ -553,98 +552,3 @@ func (a *ActivityLog) namespaceRecordToCountsResponse(record *activity.Namespace ACMEClients: int(record.ACMEClients), } } - -func (a *ActivityLog) extractLocalGlobalClientsDeprecatedStoragePath(ctx context.Context) (map[int64][]*activity.EntityRecord, map[int64][]*activity.EntityRecord, error) { - clusterGlobalClients := make(map[int64][]*activity.EntityRecord) - clusterLocalClients := make(map[int64][]*activity.EntityRecord) - - // Extract global clients on the current cluster per month store them in a map - times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath) - if err != nil { - a.logger.Error("could not list available logs until now") - return clusterLocalClients, clusterGlobalClients, fmt.Errorf("could not list available logs on the cluster") - } - for _, time := range times { - entityPath := activityEntityBasePath + fmt.Sprint(time.Unix()) + "/" - segmentPaths, err := a.view.List(ctx, entityPath) - if err != nil { - return nil, nil, err - } - for _, seqNumber := range segmentPaths { - segment, err := a.readEntitySegmentAtPath(ctx, entityPath+seqNumber) - if segment == nil { - continue - } - if err != nil { - a.logger.Warn("failed to read segment", "error", err) - return clusterLocalClients, clusterGlobalClients, err - } - for _, entity := range segment.GetClients() { - // If the client is not local, then add it to a map - if local, _ := a.isClientLocal(entity); !local { - if _, ok := clusterGlobalClients[time.Unix()]; !ok { - clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0) - } - clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity) - } else { - if _, ok := clusterLocalClients[time.Unix()]; !ok { - clusterLocalClients[time.Unix()] = make([]*activity.EntityRecord, 0) - } - clusterLocalClients[time.Unix()] = append(clusterLocalClients[time.Unix()], entity) - } - } - } - } - - return clusterLocalClients, clusterGlobalClients, nil -} - -func (a *ActivityLog) extractTokensDeprecatedStoragePath(ctx context.Context) (map[int64][]map[string]uint64, error) { - tokensByMonth := make(map[int64][]map[string]uint64) - times, err := a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath) - if err != nil { - return nil, err - } - for _, monthTime := range times { - tokenPath := activityTokenBasePath + fmt.Sprint(monthTime.Unix()) + "/" - segmentPaths, err := a.view.List(ctx, tokenPath) - if err != nil { - return nil, err - } - tokensByMonth[monthTime.Unix()] = make([]map[string]uint64, 0) - for _, seqNum := range segmentPaths { - tokenCount, err := a.readTokenSegmentAtPath(ctx, tokenPath+seqNum) - if tokenCount == nil { - a.logger.Error("data at path has been unexpectedly deleted", "path", tokenPath+seqNum) - continue - } - if err != nil { - return nil, err - } - tokensByMonth[monthTime.Unix()] = append(tokensByMonth[monthTime.Unix()], tokenCount.CountByNamespaceID) - } - } - return tokensByMonth, nil -} - -// OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and -// hence supports deduplicated clients -func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool { - oldestVersionIsDedupClients := a.core.IsNewInstall(ctx) - if !oldestVersionIsDedupClients { - if v, _, err := a.core.FindOldestVersionTimestamp(); err == nil { - oldestVersion, err := semver.NewSemver(v) - if err != nil { - a.core.logger.Debug("could not extract version instance", "version", v) - return false - } - dedupChangeVersion, err := semver.NewSemver(DeduplicatedClientMinimumVersion) - if err != nil { - a.core.logger.Debug("could not extract version instance", "version", DeduplicatedClientMinimumVersion) - return false - } - oldestVersionIsDedupClients = oldestVersionIsDedupClients || oldestVersion.GreaterThanOrEqual(dedupChangeVersion) - } - } - return oldestVersionIsDedupClients -} diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go index 2d0a0c4ceee2..f84775da3fc2 100644 --- a/vault/activity_log_util_common_test.go +++ b/vault/activity_log_util_common_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/axiomhq/hyperloglog" - "github.com/go-test/deep" "github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/vault/activity" "github.com/stretchr/testify/require" @@ -1015,14 +1014,6 @@ func writeTokenSegment(t *testing.T, core *Core, ts time.Time, index int, item * WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem) } -// writeTokenSegmentOldPath writes a single segment file with the given time and index for a token at the old path -func writeTokenSegmentOldPath(t *testing.T, core *Core, ts time.Time, index int, item *activity.TokenCount) { - t.Helper() - protoItem, err := proto.Marshal(item) - require.NoError(t, err) - WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, ts, index), protoItem) -} - // makeSegmentPath formats the path for a segment at a particular time and index func makeSegmentPath(t *testing.T, typ string, ts time.Time, index int) string { t.Helper() @@ -1222,50 +1213,3 @@ func TestSegmentFileReader(t *testing.T) { require.True(t, proto.Equal(gotTokens[i], tokens[i])) } } - -// TestExtractTokens_OldStoragePaths verifies that the correct tokens are extracted -// from the old token paths in storage. These old storage paths were used in <=1.9 to -// store tokens without clientIds (non-entity tokens). -func TestExtractTokens_OldStoragePaths(t *testing.T) { - core, _, _ := TestCoreUnsealed(t) - now := time.Now() - - // write token at index 3 - token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ - "ns": 10, - "ns3": 1, - "ns1": 2, - }} - - lastMonth := timeutil.StartOfPreviousMonth(now) - twoMonthsAgo := timeutil.StartOfPreviousMonth(lastMonth) - - thisMonthData := []map[string]uint64{token.CountByNamespaceID, token.CountByNamespaceID} - lastMonthData := []map[string]uint64{token.CountByNamespaceID, token.CountByNamespaceID, token.CountByNamespaceID, token.CountByNamespaceID} - twoMonthsAgoData := []map[string]uint64{token.CountByNamespaceID} - - expected := map[int64][]map[string]uint64{ - now.Unix(): thisMonthData, - lastMonth.Unix(): lastMonthData, - twoMonthsAgo.Unix(): twoMonthsAgoData, - } - - // This month's token data is at broken segment sequences - writeTokenSegmentOldPath(t, core, now, 1, token) - writeTokenSegmentOldPath(t, core, now, 3, token) - // Last months token data is at normal segment sequences - writeTokenSegmentOldPath(t, core, lastMonth, 0, token) - writeTokenSegmentOldPath(t, core, lastMonth, 1, token) - writeTokenSegmentOldPath(t, core, lastMonth, 2, token) - writeTokenSegmentOldPath(t, core, lastMonth, 3, token) - // Month before is at only one random segment sequence - writeTokenSegmentOldPath(t, core, twoMonthsAgo, 2, token) - - tokens, err := core.activityLog.extractTokensDeprecatedStoragePath(context.Background()) - require.NoError(t, err) - require.Equal(t, 3, len(tokens)) - - if diff := deep.Equal(expected, tokens); diff != nil { - t.Fatal(diff) - } -}