Skip to content

Commit

Permalink
sql: clarify that tableVersion refcount and leased are protected
Browse files Browse the repository at this point in the history
Also remove reference from tableState to tableNameCache.

Release note: None
  • Loading branch information
vivekmenezes committed Aug 10, 2018
1 parent 7272ace commit eb22d4a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
5 changes: 4 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ CREATE TABLE crdb_internal.leases (
continue
}

if !state.leased {
state.mu.Lock()
leased := state.mu.leased
state.mu.Unlock()
if !leased {
continue
}
expCopy := state.leaseExpiration()
Expand Down
73 changes: 40 additions & 33 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,29 @@ type tableVersionState struct {
// isn't associated with a lease.
expiration hlc.Timestamp

// mu protects refcount and leased.
mu syncutil.Mutex
refcount int
// Set if the node has a lease on this descriptor version.
// Leases can only be held for the two latest versions of
// a table descriptor. The latest version known to a node
// (can be different than the current latest version in the store)
// is always associated with a lease. The previous version known to
// a node might not necessarily be associated with a lease.
leased bool
mu struct {
syncutil.Mutex

refcount int
// Set if the node has a lease on this descriptor version.
// Leases can only be held for the two latest versions of
// a table descriptor. The latest version known to a node
// (can be different than the current latest version in the store)
// is always associated with a lease. The previous version known to
// a node might not necessarily be associated with a lease.
leased bool
}
}

func (s *tableVersionState) String() string {
return fmt.Sprintf("%d(%q) ver=%d:%s, refcount=%d", s.ID, s.Name, s.Version, s.expiration, s.refcount)
s.mu.Lock()
defer s.mu.Unlock()
return s.stringLocked()
}

// stringLocked reads mu.refcount and thus needs to have mu held.
func (s *tableVersionState) stringLocked() string {
return fmt.Sprintf("%d(%q) ver=%d:%s, refcount=%d", s.ID, s.Name, s.Version, s.expiration, s.mu.refcount)
}

// hasExpired checks if the table is too old to be used (by a txn operating)
Expand All @@ -96,8 +105,8 @@ func (s *tableVersionState) incRefcount() {
}

func (s *tableVersionState) incRefcountLocked() {
s.refcount++
log.VEventf(context.TODO(), 2, "tableVersionState.incRef: %s", s)
s.mu.refcount++
log.VEventf(context.TODO(), 2, "tableVersionState.incRef: %s", s.stringLocked())
}

// The lease expiration stored in the database is of a different type.
Expand Down Expand Up @@ -163,8 +172,8 @@ func (s LeaseStore) acquire(ctx context.Context, tableID sqlbase.ID) (*tableVers
table = &tableVersionState{
TableDescriptor: *tableDesc,
expiration: expiration,
leased: true,
}
table.mu.leased = true

// ValidateTable instead of Validate, even though we have a txn available,
// so we don't block reads waiting for this table version.
Expand Down Expand Up @@ -601,10 +610,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS
}

type tableState struct {
id sqlbase.ID
// The cache is updated every time we acquire or release a table.
tableNameCache *tableNameCache
stopper *stop.Stopper
id sqlbase.ID
stopper *stop.Stopper

// renewalInProgress is an atomic indicator for when a renewal for a
// lease has begun. This is atomic to prevent multiple routines from
Expand Down Expand Up @@ -818,12 +825,12 @@ func (t *tableState) upsertLocked(
s.mu.Lock()
table.mu.Lock()
// subsume the refcount of the older lease.
table.refcount += s.refcount
s.refcount = 0
s.leased = false
table.mu.refcount += s.mu.refcount
s.mu.refcount = 0
s.mu.leased = false
log.VEventf(ctx, 2, "replaced lease: %s with %s", s.stringLocked(), table.stringLocked())
table.mu.Unlock()
s.mu.Unlock()
log.VEventf(ctx, 2, "replaced lease: %s with %s", s, table)
t.mu.active.remove(s)
t.mu.active.insert(table)
return s
Expand All @@ -839,10 +846,10 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) []*tableVersionStat
func() {
table.mu.Lock()
defer table.mu.Unlock()
if table.refcount == 0 {
if table.mu.refcount == 0 {
t.mu.active.remove(table)
if table.leased {
table.leased = false
if table.mu.leased {
table.mu.leased = false
leases = append(leases, table)
}
}
Expand Down Expand Up @@ -873,7 +880,7 @@ func acquireNodeLease(ctx context.Context, m *LeaseManager, id sqlbase.ID) (bool
t.mu.Lock()
defer t.mu.Unlock()
toRelease = t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
m.tableNames.insert(table)
return leaseToken(table), nil
})
result := <-resultChan
Expand Down Expand Up @@ -916,14 +923,14 @@ func (t *tableState) release(

s.mu.Lock()
defer s.mu.Unlock()
s.refcount--
log.VEventf(context.TODO(), 2, "release: %s", s)
if s.refcount < 0 {
s.mu.refcount--
log.VEventf(context.TODO(), 2, "release: %s", s.stringLocked())
if s.mu.refcount < 0 {
panic(fmt.Sprintf("negative ref count: %s", s))
}

if s.refcount == 0 && s.leased && removeOnceDereferenced {
s.leased = false
if s.mu.refcount == 0 && s.mu.leased && removeOnceDereferenced {
s.mu.leased = false
return true
}
return false
Expand Down Expand Up @@ -1152,7 +1159,7 @@ func (c *tableNameCache) get(
dbID, tableName, table.ID, table.ParentID, table.Name))
}

if !table.leased {
if !table.mu.leased {
// This get() raced with a release operation. The leaseManager should remove
// this cache entry soon.
return nil
Expand Down Expand Up @@ -1511,7 +1518,7 @@ func (m *LeaseManager) findTableState(tableID sqlbase.ID, create bool) *tableSta
defer m.mu.Unlock()
t := m.mu.tables[tableID]
if t == nil && create {
t = &tableState{id: tableID, tableNameCache: &m.tableNames, stopper: m.stopper}
t = &tableState{id: tableID, stopper: m.stopper}
m.mu.tables[tableID] = t
}
return t
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/pgwire_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ func TestPGWireConnectionCloseReleasesLeases(t *testing.T) {
// Wait for the lease to be released.
testutils.SucceedsSoon(t, func() error {
ts.mu.Lock()
refcount := ts.mu.active.data[0].refcount
ts.mu.Unlock()
defer ts.mu.Unlock()
tv := ts.mu.active.data[0]
tv.mu.Lock()
defer tv.mu.Unlock()
refcount := tv.mu.refcount
if refcount != 0 {
return errors.Errorf(
"expected lease to be unused, found refcount: %d", refcount)
Expand Down

0 comments on commit eb22d4a

Please sign in to comment.