Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <[email protected]>
  • Loading branch information
adecaro committed Nov 22, 2024
1 parent 0a2407b commit f4c5bb7
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 61 deletions.
12 changes: 6 additions & 6 deletions docs/core-token.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ token:
retryInterval: 5s
# retry to gain a lock on tokens this amount of times before failing the transaction
numRetries: 3
# evictionInterval is the period a token can be locked, after which it is forcefully unlocked
# if evictionInterval is zero, the eviction algorithm is never executed
evictionInterval: 3m
# cleanupTickPeriod defines how often the eviction algorithm must be executed
# if cleanupTickPeriod is zero, the eviction algorithm is never executed
cleanupTickPeriod: 1m
# leaseExpiry is the period a token can be locked, after which it is forcefully unlocked
# if leaseExpiry is zero, the eviction algorithm is never executed
leaseExpiry: 3m
# leaseCleanupTickPeriod defines how often the eviction algorithm must be executed
# if leaseCleanupTickPeriod is zero, the eviction algorithm is never executed
leaseCleanupTickPeriod: 90s

tms:
mytms: # unique name of this token management system
Expand Down
2 changes: 1 addition & 1 deletion token/services/db/driver/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ type TokenLockDB interface {
// Cleanup removes the locks such that either:
// 1. The transaction that locked that token is valid or invalid;
// 2. The lock is too old.
Cleanup(evictionDelay time.Duration) error
Cleanup(leaseExpiry time.Duration) error
// Close closes the database
Close() error
}
Expand Down
4 changes: 2 additions & 2 deletions token/services/db/sql/postgres/tokenlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func NewTokenLockDB(db *sql.DB, k common.NewDBOpts) (driver.TokenLockDB, error)
return &TokenLockDB{TokenLockDB: tldb}, nil
}

func (db *TokenLockDB) Cleanup(evictionDelay time.Duration) error {
func (db *TokenLockDB) Cleanup(leaseExpiry time.Duration) error {
query := fmt.Sprintf(
"DELETE FROM %s "+
"USING %s WHERE %s.consumer_tx_id = %s.tx_id AND (%s.status IN (%d) "+
"OR %s.created_at < NOW() - INTERVAL '%d seconds'"+
");",
db.Table.TokenLocks,
db.Table.Requests, db.Table.TokenLocks, db.Table.Requests, db.Table.Requests, driver.Deleted,
db.Table.TokenLocks, int(evictionDelay.Seconds()),
db.Table.TokenLocks, int(leaseExpiry.Seconds()),
)
db.Logger.Debug(query)
_, err := db.DB.Exec(query)
Expand Down
4 changes: 2 additions & 2 deletions token/services/db/sql/sqlite/tokenlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ type TokenLockDB struct {
*common.TokenLockDB
}

func (db *TokenLockDB) Cleanup(evictionDelay time.Duration) error {
func (db *TokenLockDB) Cleanup(leaseExpiry time.Duration) error {
query := fmt.Sprintf(
"DELETE FROM %s WHERE tx_id IN ("+
"SELECT %s.tx_id FROM %s JOIN %s ON %s.tx_id = %s.tx_id WHERE %s.status IN (%d) "+
"OR %s.created_at < datetime('now', '-%d seconds')"+
");",
db.Table.TokenLocks,
db.Table.TokenLocks, db.Table.TokenLocks, db.Table.Requests, db.Table.TokenLocks, db.Table.Requests, db.Table.Requests, driver.Deleted,
db.Table.TokenLocks, int(evictionDelay.Seconds()),
db.Table.TokenLocks, int(leaseExpiry.Seconds()),
)
db.Logger.Debug(query)
_, err := db.DB.Exec(query)
Expand Down
36 changes: 18 additions & 18 deletions token/services/selector/config/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ import (
)

const (
defaultDriver = driver.Sherdlock
defaultEvictionInterval = 3 * time.Minute
defaultCleanupTickPeriod = 1 * time.Minute
defaultNumRetries = 3
defaultRetryInterval = 5 * time.Second
defaultDriver = driver.Sherdlock
defaultLeaseExpiry = 3 * time.Minute
defaultLeaseCleanupTickPeriod = 1 * time.Minute
defaultNumRetries = 3
defaultRetryInterval = 5 * time.Second
)

type configService interface {
UnmarshalKey(key string, rawVal interface{}) error
}

type Config struct {
Driver driver.Driver `yaml:"driver,omitempty"`
RetryInterval time.Duration `yaml:"retryInterval,omitempty"`
NumRetries int `yaml:"numRetries,omitempty"`
EvictionInterval time.Duration `yaml:"evictionInterval,omitempty"`
CleanupTickPeriod time.Duration `yaml:"cleanupTickPeriod,omitempty"`
Driver driver.Driver `yaml:"driver,omitempty"`
RetryInterval time.Duration `yaml:"retryInterval,omitempty"`
NumRetries int `yaml:"numRetries,omitempty"`
LeaseExpiry time.Duration `yaml:"leaseExpiry,omitempty"`
LeaseCleanupTickPeriod time.Duration `yaml:"leaseCleanupTickPeriod,omitempty"`
}

// New returns a SelectorConfig with the values from the token.selector key
Expand Down Expand Up @@ -64,16 +64,16 @@ func (c *Config) GetRetryInterval() time.Duration {
return defaultRetryInterval
}

func (c *Config) GetEvictionInterval() time.Duration {
if c.EvictionInterval != 0 {
return c.EvictionInterval
func (c *Config) GetLeaseExpiry() time.Duration {
if c.LeaseExpiry != 0 {
return c.LeaseExpiry
}
return defaultEvictionInterval
return defaultLeaseExpiry
}

func (c *Config) GetCleanupTickPeriod() time.Duration {
if c.CleanupTickPeriod != 0 {
return c.CleanupTickPeriod
func (c *Config) GetLeaseCleanupTickPeriod() time.Duration {
if c.LeaseCleanupTickPeriod != 0 {
return c.LeaseCleanupTickPeriod
}
return defaultCleanupTickPeriod
return defaultLeaseCleanupTickPeriod
}
4 changes: 2 additions & 2 deletions token/services/selector/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type SelectorConfig interface {
GetDriver() Driver
GetNumRetries() int
GetRetryInterval() time.Duration
GetEvictionInterval() time.Duration
GetCleanupTickPeriod() time.Duration
GetLeaseExpiry() time.Duration
GetLeaseCleanupTickPeriod() time.Duration
}

type Driver string
2 changes: 1 addition & 1 deletion token/services/selector/sherdlock/inmemory/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func (l *locker) UnlockByTxID(txID transaction.ID) error {
return nil
}

func (l *locker) Cleanup(evictionDelay time.Duration) error {
func (l *locker) Cleanup(leaseExpiry time.Duration) error {
return nil
}
30 changes: 15 additions & 15 deletions token/services/selector/sherdlock/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Locker interface {
// Cleanup removes the locks such that either:
// 1. The transaction that locked that token is valid or invalid;
// 2. The lock is too old.
Cleanup(evictionDelay time.Duration) error
Cleanup(leaseExpiry time.Duration) error
}

type tokenSelectorUnlocker interface {
Expand All @@ -33,10 +33,10 @@ type tokenSelectorUnlocker interface {
}

type manager struct {
selectorCache lazy2.Provider[transaction.ID, tokenSelectorUnlocker]
locker Locker
evictionInterval time.Duration
cleanupTickPeriod time.Duration
selectorCache lazy2.Provider[transaction.ID, tokenSelectorUnlocker]
locker Locker
leaseExpiry time.Duration
leaseCleanupTickPeriod time.Duration
}

type iterator[k any] interface {
Expand All @@ -50,18 +50,18 @@ func NewManager(
precision uint64,
backoff time.Duration,
maxRetriesAfterBackOff int,
evictionInterval time.Duration,
cleanupTickPeriod time.Duration,
leaseExpiry time.Duration,
leaseCleanupTickPeriod time.Duration,
) *manager {
m := &manager{
locker: locker,
evictionInterval: evictionInterval,
cleanupTickPeriod: cleanupTickPeriod,
locker: locker,
leaseExpiry: leaseExpiry,
leaseCleanupTickPeriod: leaseCleanupTickPeriod,
selectorCache: lazy2.NewProvider(func(txID transaction.ID) (tokenSelectorUnlocker, error) {
return NewSherdSelector(txID, fetcher, locker, precision, backoff, maxRetriesAfterBackOff), nil
}),
}
if cleanupTickPeriod > 0 && evictionInterval > 0 {
if leaseCleanupTickPeriod > 0 && leaseExpiry > 0 {
go m.cleaner()
}
return m
Expand All @@ -83,13 +83,13 @@ func (m *manager) Close(id transaction.ID) error {
}

func (m *manager) cleaner() {
ticker := time.NewTicker(m.cleanupTickPeriod) // Change the duration as needed
ticker := time.NewTicker(m.leaseCleanupTickPeriod)
defer ticker.Stop()

for range ticker.C {
logger.Debugf("cleanup locked tokens with eviction delay of [%s]", m.evictionInterval)
if err := m.locker.Cleanup(m.evictionInterval); err != nil {
logger.Errorf("failed cleaning up eviction locks: %s", err)
logger.Debugf("release token locks older than [%s]", m.leaseExpiry)
if err := m.locker.Cleanup(m.leaseExpiry); err != nil {
logger.Errorf("failed to release token locks: [%s]", err)
}
}
}
28 changes: 14 additions & 14 deletions token/services/selector/sherdlock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func NewService(fetcherProvider FetcherProvider, tokenLockDBManager *tokenlockdb
}

loader := &loader{
tokenLockDBManager: tokenLockDBManager,
fetcherProvider: fetcherProvider,
retryInterval: cfg.GetRetryInterval(),
numRetries: cfg.GetNumRetries(),
evictionInterval: cfg.GetEvictionInterval(),
cleanupTickPeriod: cfg.GetCleanupTickPeriod(),
tokenLockDBManager: tokenLockDBManager,
fetcherProvider: fetcherProvider,
retryInterval: cfg.GetRetryInterval(),
numRetries: cfg.GetNumRetries(),
leaseExpiry: cfg.GetLeaseExpiry(),
leaseCleanupTickPeriod: cfg.GetLeaseCleanupTickPeriod(),
}
return &SelectorService{
managerLazyCache: lazy2.NewProviderWithKeyMapper(key, loader.load),
Expand All @@ -49,12 +49,12 @@ func (s *SelectorService) SelectorManager(tms *token.ManagementService) (token.S
}

type loader struct {
tokenLockDBManager *tokenlockdb.Manager
fetcherProvider FetcherProvider
numRetries int
retryInterval time.Duration
evictionInterval time.Duration
cleanupTickPeriod time.Duration
tokenLockDBManager *tokenlockdb.Manager
fetcherProvider FetcherProvider
numRetries int
retryInterval time.Duration
leaseExpiry time.Duration
leaseCleanupTickPeriod time.Duration
}

func (s *loader) load(tms *token.ManagementService) (token.SelectorManager, error) {
Expand All @@ -76,8 +76,8 @@ func (s *loader) load(tms *token.ManagementService) (token.SelectorManager, erro
pp.Precision(),
s.retryInterval,
s.numRetries,
s.evictionInterval,
s.cleanupTickPeriod,
s.leaseExpiry,
s.leaseCleanupTickPeriod,
), nil
}

Expand Down

0 comments on commit f4c5bb7

Please sign in to comment.