Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
write generated export padding to the database, ensure overlapping ex… (
Browse files Browse the repository at this point in the history
#1119)

* write generated export padding to the database, ensure overlapping exports contain same data

* add multi lock docs

* tabbing

* review comments
  • Loading branch information
mikehelmick authored Oct 29, 2020
1 parent d6b6371 commit 3a3417b
Show file tree
Hide file tree
Showing 8 changed files with 476 additions and 87 deletions.
54 changes: 54 additions & 0 deletions internal/database/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"

"github.com/google/exposure-notifications-server/pkg/logging"
Expand All @@ -35,6 +36,59 @@ var (
// UnlockFn can be deferred to release a lock.
type UnlockFn func() error

// MultiLock obtains multiple locks in a single transaction. Either all locks are obtained, or
// the transaction is rolled back.
// The lockIDs are sorted by normal ascending string sort order before obtaining the locks.
func (db *DB) MultiLock(ctx context.Context, lockIDs []string, ttl time.Duration) (UnlockFn, error) {
if len(lockIDs) == 0 {
return nil, fmt.Errorf("no lockIDs presented")
}

lockOrder := make([]string, len(lockIDs))
// Make a copy of the slice so that we have a stable slice in the unlcok function.
copy(lockOrder, lockIDs)
sort.Strings(lockOrder)

var expires time.Time
err := db.InTx(ctx, pgx.Serializable, func(tx pgx.Tx) error {
for _, lockID := range lockOrder {
row := tx.QueryRow(ctx, `SELECT AcquireLock($1, $2)`, lockID, int(ttl.Seconds()))
if err := row.Scan(&expires); err != nil {
return err
}
if expires.Before(thePast) {
return ErrAlreadyLocked
}
}
return nil
})
if err != nil {
return nil, err
}
logging.FromContext(ctx).Debugf("Acquired locks %v", lockOrder)
return makeMultiUnlockFn(ctx, db, lockOrder, expires), nil
}

func makeMultiUnlockFn(ctx context.Context, db *DB, lockIDs []string, expires time.Time) UnlockFn {
return func() error {
return db.InTx(ctx, pgx.Serializable, func(tx pgx.Tx) error {
for i := len(lockIDs) - 1; i >= 0; i-- {
lockID := lockIDs[i]
row := tx.QueryRow(ctx, `SELECT ReleaseLock($1, $2)`, lockID, expires)
var released bool
if err := row.Scan(&released); err != nil {
return err
}
if !released {
return fmt.Errorf("cannot delete lock %q that no longer belongs to you; it likely expired and was taken by another process", lockID)
}
logging.FromContext(ctx).Debugf("Released lock %q", lockID)
}
return nil
})
}
}

// Lock acquires lock with given name that times out after ttl. Returns an UnlockFn that can be used to unlock the lock. ErrAlreadyLocked will be returned if there is already a lock in use.
func (db *DB) Lock(ctx context.Context, lockID string, ttl time.Duration) (UnlockFn, error) {
var expires time.Time
Expand Down
43 changes: 43 additions & 0 deletions internal/database/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,46 @@ func TestLock(t *testing.T) {
t.Fatalf("got %d rows from Lock table, wanted zero", count)
}
}

func TestMultiLock(t *testing.T) {
t.Parallel()

testDB := NewTestDatabase(t)
ctx := context.Background()

neededLocks := []string{"traveler", "US", "CA", "MX"}

unlock1, err := testDB.MultiLock(ctx, neededLocks, time.Minute)
if err != nil {
t.Fatalf("failed to obtain locks, %v, err: %v", neededLocks, err)
}

overlappingLocks := []string{"CA", "CH", "UK"}

if _, err := testDB.MultiLock(ctx, overlappingLocks, time.Minute); err == nil {
t.Fatalf("expected lock acquisition to fail, but didn't.")
} else if !errors.Is(err, ErrAlreadyLocked) {
t.Fatalf("wong error want: %v, got: %v", ErrAlreadyLocked, err)
}

nonoverlappingLocks := []string{"CH", "UK"}
unlock2, err := testDB.MultiLock(ctx, nonoverlappingLocks, time.Minute)
if err != nil {
t.Fatalf("failed to obtain locks, %v, err: %v", nonoverlappingLocks, err)
}

if err := unlock1(); err != nil {
t.Fatalf("failed to release locks: %v", err)
}

// should still fail, because there is still ovelap w/ the second lock.
if _, err := testDB.MultiLock(ctx, overlappingLocks, time.Minute); err == nil {
t.Fatalf("expected lock acquisition to fail, but didn't.")
} else if !errors.Is(err, ErrAlreadyLocked) {
t.Fatalf("wong error want: %v, got: %v", ErrAlreadyLocked, err)
}

if err := unlock2(); err != nil {
t.Fatalf("failed to release locks: %v", err)
}
}
19 changes: 10 additions & 9 deletions internal/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ type Config struct {
Storage storage.Config
ObservabilityExporter observability.Config

Port string `env:"PORT, default=8080"`
CreateTimeout time.Duration `env:"CREATE_BATCHES_TIMEOUT, default=5m"`
WorkerTimeout time.Duration `env:"WORKER_TIMEOUT, default=5m"`
MinRecords int `env:"EXPORT_FILE_MIN_RECORDS, default=1000"`
PaddingRange int `env:"EXPORT_FILE_PADDING_RANGE, default=100"`
MaxRecords int `env:"EXPORT_FILE_MAX_RECORDS, default=30000"`
TruncateWindow time.Duration `env:"TRUNCATE_WINDOW, default=1h"`
MinWindowAge time.Duration `env:"MIN_WINDOW_AGE, default=2h"`
TTL time.Duration `env:"CLEANUP_TTL, default=336h"`
Port string `env:"PORT, default=8080"`
CreateTimeout time.Duration `env:"CREATE_BATCHES_TIMEOUT, default=5m"`
WorkerTimeout time.Duration `env:"WORKER_TIMEOUT, default=5m"`
MinRecords int `env:"EXPORT_FILE_MIN_RECORDS, default=1000"`
PaddingRange int `env:"EXPORT_FILE_PADDING_RANGE, default=100"`
MaxRecords int `env:"EXPORT_FILE_MAX_RECORDS, default=30000"`
MaxInsertBatchSize int `env:"MAX_INSERT_BATCH_SIZE, default=100"`
TruncateWindow time.Duration `env:"TRUNCATE_WINDOW, default=1h"`
MinWindowAge time.Duration `env:"MIN_WINDOW_AGE, default=2h"`
TTL time.Duration `env:"CLEANUP_TTL, default=336h"`
}

func (c *Config) BlobstoreConfig() *storage.Config {
Expand Down
5 changes: 5 additions & 0 deletions internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, batchMaxC
var openBatchIDs []int64

if err := db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
// batches are ordered by end time. We try to fill the "oldest"
// batches first so that padding written will be covered by
// "newer" export batches.
rows, err := tx.Query(ctx, `
SELECT
batch_id
Expand All @@ -535,6 +538,8 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, batchMaxC
)
AND
end_timestamp < $3
ORDER BY
end_timestamp ASC
LIMIT 100
`, model.ExportBatchOpen, model.ExportBatchPending, batchMaxCloseTime)
if err != nil {
Expand Down
Loading

0 comments on commit 3a3417b

Please sign in to comment.