Skip to content

Commit

Permalink
[refactor] - Rename S3 ProgressTracker (#3652)
Browse files Browse the repository at this point in the history
* rename

* update

* fix typo
  • Loading branch information
ahrav authored Nov 22, 2024
1 parent 3c69bbc commit 9a6cad9
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 102 deletions.
116 changes: 63 additions & 53 deletions pkg/sources/s3/progress_tracker.go → pkg/sources/s3/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@ import (
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
)

// ProgressTracker maintains scan progress state for S3 bucket scanning,
// Checkpointer maintains resumption state for S3 bucket scanning,
// enabling resumable scans by tracking which objects have been successfully processed.
// It provides checkpoints that can be used to resume interrupted scans without missing objects.
//
// S3 buckets are organized as flat namespaces of objects identified by unique keys.
// When listing objects, S3 returns paginated results with a maximum of 1000 objects per page.
// The ListObjectsV2 API accepts a 'StartAfter' parameter that allows resuming the listing
// from a specific object key.
//
// The tracker maintains state for the current page of objects (up to 1000) using a boolean array
// The checkpointer maintains state for the current page of objects (up to 1000) using a boolean array
// to track completion status and an ordered list to record the sequence of completions.
// This enables finding the highest consecutive completed index as a "low water mark".
//
Expand All @@ -41,28 +37,38 @@ import (
// Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999
// Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499
// On resume: StartAfter=object1499 in saved bucket, scanning continues from object 1500
type ProgressTracker struct {
//
// Important constraints:
// - Only tracks completion state for a single page of objects (up to 1000)
// - Supports concurrent object processing within a page
// - Does NOT support concurrent page processing
// - Must be Reset() between pages
type Checkpointer struct {
enabled bool

// completedObjects tracks which indices in the current page have been processed.
sync.Mutex
mu sync.Mutex // protects concurrent access to completion state.
completedObjects []bool
completionOrder []int // Track the order in which objects complete

// lowestIncompleteIdx tracks the first index that hasn't been completed.
// This optimizes checkpoint creation by avoiding recalculation.
lowestIncompleteIdx int

// progress holds the scan's overall progress state and enables persistence.
// The EncodedResumeInfo field stores the JSON-encoded ResumeInfo checkpoint.
progress *sources.Progress // Reference to source's Progress
}

const defaultMaxObjectsPerPage = 1000

// NewProgressTracker creates a new progress tracker for S3 scanning operations.
// The enabled parameter determines if progress tracking is active, and progress
// NewCheckpointer creates a new checkpointer for S3 scanning operations.
// The enabled parameter determines if checkpointing is active, and progress
// provides the underlying mechanism for persisting scan state.
func NewProgressTracker(ctx context.Context, enabled bool, progress *sources.Progress) *ProgressTracker {
ctx.Logger().Info("Creating progress tracker")
func NewCheckpointer(ctx context.Context, enabled bool, progress *sources.Progress) *Checkpointer {
ctx.Logger().Info("Creating checkpointer")

return &ProgressTracker{
return &Checkpointer{
// We are resuming if we have completed objects from a previous scan.
completedObjects: make([]bool, defaultMaxObjectsPerPage),
completionOrder: make([]int, 0, defaultMaxObjectsPerPage),
Expand All @@ -72,16 +78,18 @@ func NewProgressTracker(ctx context.Context, enabled bool, progress *sources.Pro
}

// Reset prepares the tracker for a new page of objects by clearing the completion state.
func (p *ProgressTracker) Reset() {
// Must be called before processing each new page of objects.
func (p *Checkpointer) Reset() {
if !p.enabled {
return
}

p.Lock()
defer p.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
// Store the current completed count before moving to next page.
p.completedObjects = make([]bool, defaultMaxObjectsPerPage)
p.completionOrder = make([]int, 0, defaultMaxObjectsPerPage)
p.lowestIncompleteIdx = 0
}

// ResumeInfo represents the state needed to resume an interrupted operation.
Expand All @@ -92,11 +100,11 @@ type ResumeInfo struct {
StartAfter string `json:"start_after"` // Last processed object key
}

// GetResumePoint retrieves the last saved checkpoint state if one exists.
// ResumePoint retrieves the last saved checkpoint state if one exists.
// It returns nil if progress tracking is disabled or no resume state exists.
// This method decodes the stored resume information and validates it contains
// the minimum required data to enable resumption.
func (p *ProgressTracker) GetResumePoint(ctx context.Context) (ResumeInfo, error) {
func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error) {
resume := ResumeInfo{}

if !p.enabled || p.progress.EncodedResumeInfo == "" {
Expand All @@ -118,7 +126,7 @@ func (p *ProgressTracker) GetResumePoint(ctx context.Context) (ResumeInfo, error

// Complete marks the entire scanning operation as finished and clears the resume state.
// This should only be called once all scanning operations are complete.
func (p *ProgressTracker) Complete(_ context.Context, message string) error {
func (p *Checkpointer) Complete(_ context.Context, message string) error {
// Preserve existing progress counters while clearing resume state.
p.progress.SetProgressComplete(
int(p.progress.SectionsCompleted),
Expand All @@ -129,14 +137,11 @@ func (p *ProgressTracker) Complete(_ context.Context, message string) error {
return nil
}

// UpdateObjectProgress records successfully processed objects within the current page
// UpdateObjectCompletion records successfully processed objects within the current page
// and maintains fine-grained resumption checkpoints. It uses a conservative tracking
// strategy that ensures no objects are missed by only checkpointing consecutively
// completed objects.
//
// This method manages the detailed object-level progress tracking and creates
// checkpoints that enable resumption of interrupted scans.
//
// This approach ensures scan reliability by only checkpointing consecutively completed
// objects. While this may result in re-scanning some objects when resuming, it guarantees
// no objects are missed in case of interruption.
Expand All @@ -146,10 +151,13 @@ func (p *ProgressTracker) Complete(_ context.Context, message string) error {
// - Objects completed: [0,1,2,3,4,5,7,8]
// - The checkpoint will only include objects 0-5 since they are consecutive
// - If scanning is interrupted and resumed:
// - Scan resumes after object 5 (the last checkpoint)
// - Objects 7-8 will be re-scanned even though they completed before
// - This ensures object 6 is not missed
func (p *ProgressTracker) UpdateObjectProgress(
// -- Scan resumes after object 5 (the last checkpoint)
// -- Objects 7-8 will be re-scanned even though they completed before
// -- This ensures object 6 is not missed
//
// Thread-safe for concurrent object processing within a single page.
// WARNING: Not safe for concurrent page processing.
func (p *Checkpointer) UpdateObjectCompletion(
ctx context.Context,
completedIdx int,
bucket string,
Expand All @@ -166,46 +174,48 @@ func (p *ProgressTracker) UpdateObjectProgress(
return fmt.Errorf("completed index %d exceeds maximum page size", completedIdx)
}

p.Lock()
defer p.Unlock()
p.mu.Lock()
defer p.mu.Unlock()

// Only track completion if this is the first time this index is marked complete.
// Only process if this is the first time this index is marked complete.
if !p.completedObjects[completedIdx] {
p.completedObjects[completedIdx] = true
p.completionOrder = append(p.completionOrder, completedIdx)
}

// Find the highest safe checkpoint we can create.
lastSafeIdx := -1
var safeIndices [defaultMaxObjectsPerPage]bool

// Mark all completed indices.
for _, idx := range p.completionOrder {
safeIndices[idx] = true
// If we completed the lowest incomplete index, scan forward to find the new lowest.
if completedIdx == p.lowestIncompleteIdx {
p.advanceLowestIncompleteIdx()
}
}

// Find the highest consecutive completed index.
for i := range len(p.completedObjects) {
if !safeIndices[i] {
break
}
lastSafeIdx = i
// lowestIncompleteIdx points to first incomplete object, so everything before
// it is complete. We want to checkpoint at the last complete object.
checkpointIdx := p.lowestIncompleteIdx - 1
if checkpointIdx < 0 {
return nil // No completed objects yet
}
obj := pageContents[checkpointIdx]

// Update progress if we have at least one completed object.
if lastSafeIdx < 0 {
return nil
return p.updateCheckpoint(bucket, *obj.Key)
}

// advanceLowestIncompleteIdx moves the lowest incomplete index forward to the next incomplete object.
// Must be called with lock held.
func (p *Checkpointer) advanceLowestIncompleteIdx() {
for p.lowestIncompleteIdx < len(p.completedObjects) &&
p.completedObjects[p.lowestIncompleteIdx] {
p.lowestIncompleteIdx++
}
}

obj := pageContents[lastSafeIdx]
info := &ResumeInfo{CurrentBucket: bucket, StartAfter: *obj.Key}
encoded, err := json.Marshal(info)
// updateCheckpoint persists the current resumption state.
// Must be called with lock held.
func (p *Checkpointer) updateCheckpoint(bucket string, lastKey string) error {
encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey})
if err != nil {
return err
return fmt.Errorf("failed to encode resume info: %w", err)
}

// Purposefully avoid updating any progress counts.
// Only update resume info.
p.progress.SetProgressComplete(
int(p.progress.SectionsCompleted),
int(p.progress.SectionsRemaining),
Expand Down
Loading

0 comments on commit 9a6cad9

Please sign in to comment.