Skip to content

Commit

Permalink
tf: A token change. Make the 'immortal' bit be part of the atomic
Browse files Browse the repository at this point in the history
reference count.  This is required for thread-safety.  Instead of
eagerly reaping tokens whose reference counts go to zero, sweep for
garbage when an insert would trigger a rehash.

(Internal change: 2307757)
  • Loading branch information
gitamohr authored and pixar-oss committed Dec 7, 2023
1 parent fc5479c commit 66fcf72
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 156 deletions.
201 changes: 115 additions & 86 deletions pxr/base/tf/token.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@

#include <tbb/spin_mutex.h>

#include <string>
#include <ostream>
#include <vector>
#include <atomic>
#include <algorithm>
#include <cstring>
#include <new>
#include <ostream>
#include <string>
#include <utility>
#include <algorithm>
#include <vector>

using std::vector;
using std::string;
Expand Down Expand Up @@ -85,29 +87,23 @@ struct Tf_TokenRegistry
typedef _Hash<7> _OuterHash;
typedef _Hash<5> _InnerHash;

static const size_t _NumSets = 128;
static const size_t _SetMask = _NumSets-1;

// Utility to pad an instance to take up a whole number of cache lines to
// avoid false sharing.
template <class T>
struct _CacheLinePadded {
T val;
char _unused_padding[
ARCH_CACHE_LINE_SIZE-(sizeof(T) % ARCH_CACHE_LINE_SIZE)];
};
static constexpr unsigned _NumSets = 128;
static constexpr unsigned _SetMask = _NumSets-1;
static constexpr size_t _MinInsertsUntilSweepCheck = 32;

// TODO: Consider a better container than TfHashSet here.
using _RepSet = TfHashSet<TfToken::_Rep, _InnerHash, _Eq>;

// It's not ideal to use TfHashSet here. It would be better to use
// boost::multi_index from boost 1.56 or newer: it lets us lookup an element
// by a key more easily. It also has better performance. Earlier versions
// suffer from the issue where erase(iterator) has to return an iterator to
// the next element, which has to scan forward to find the next, so we use
// TfHashSet for now.
typedef TfHashSet<TfToken::_Rep, _InnerHash, _Eq> _RepSet;
// Align this structure to ensure no false sharing of 'mutex'.
struct alignas(ARCH_CACHE_LINE_SIZE)
_Set {
_RepSet reps;
unsigned insertsUntilSweepCheck = _MinInsertsUntilSweepCheck;
mutable tbb::spin_mutex mutex;
};

// Data members.
_RepSet _sets[_NumSets];
mutable _CacheLinePadded<tbb::spin_mutex> _locks[_NumSets];
_Set _sets[_NumSets];

static Tf_TokenRegistry& _GetInstance() {
return TfSingleton<Tf_TokenRegistry>::GetInstance();
Expand Down Expand Up @@ -136,48 +132,26 @@ struct Tf_TokenRegistry
return _FindPtrImpl<string const &>(s);
}

/*
* rep may be dying. remove its key and raw pointer reference
* from the table if it truly is.
*/
void _PossiblyDestroyRep(_RepPtr rep) {
bool repFoundInSet = true;
string repString;
{
unsigned int setNum = rep->_setNum;

tbb::spin_mutex::scoped_lock lock(_locks[setNum].val);

if (!rep->_isCounted)
return;

/*
* We hold the lock, but there could be others outside the lock
* modifying this same counter. Be safe: be atomic.
*/
if (--rep->_refCount != 0)
return;

if (!_sets[setNum].erase(*rep)) {
repFoundInSet = false;
repString = rep->_str;
}
}
TF_VERIFY(repFoundInSet,
"failed to find token '%s' in table for destruction",
repString.c_str());
}

void _DumpStats() const {
std::vector<std::pair<size_t, size_t> > sizesWithSet;
for (size_t i = 0; i != _NumSets; ++i) {
sizesWithSet.push_back(std::make_pair(_sets[i].size(), i));
tbb::spin_mutex::scoped_lock lock(_sets[i].mutex);
sizesWithSet.push_back(std::make_pair(_sets[i].reps.size(), i));
}
std::sort(sizesWithSet.begin(), sizesWithSet.end());
printf("Set # -- Size\n");
TF_FOR_ALL(i, sizesWithSet) {
printf("%zu -- %zu\n", i->second, i->first);
}

// Uncomment to dump every token & refcount.
// for (size_t i = 0; i != _NumSets; ++i) {
// tbb::spin_mutex::scoped_lock lock(_sets[i].mutex);
// for (auto const &rep: _sets[i].reps) {
// printf("%d '%s'\n", rep._refCount.load(), rep._str.c_str());
// }
// }

}

private:
Expand Down Expand Up @@ -212,53 +186,114 @@ struct Tf_TokenRegistry
*/
template <class Str>
inline _RepPtrAndBits _GetPtrImpl(Str s, bool makeImmortal) {
if (_IsEmpty(s))
if (_IsEmpty(s)) {
return _RepPtrAndBits();
}

size_t setNum = _GetSetNum(_CStr(s));
unsigned setNum = _GetSetNum(_CStr(s));
_Set &set = _sets[setNum];

tbb::spin_mutex::scoped_lock lock(_locks[setNum].val);
tbb::spin_mutex::scoped_lock lock(set.mutex);

// Insert or lookup an existing.
_RepSet::iterator iter = _sets[setNum].find(_LookupRep(_CStr(s)));
if (iter != _sets[setNum].end()) {
_RepSet::iterator iter = set.reps.find(_LookupRep(_CStr(s)));
if (iter != set.reps.end()) {
_RepPtr rep = &(*iter);
bool isCounted = rep->_isCounted;
bool isCounted = rep->_refCount.load(std::memory_order_relaxed) & 1;
if (isCounted) {
if (makeImmortal)
isCounted = rep->_isCounted = false;
else
++rep->_refCount;
if (makeImmortal) {
// Clear counted flag.
rep->_refCount.fetch_and(~1u, std::memory_order_relaxed);
isCounted = false;
}
else {
// Add one reference (we count by 2's).
rep->_refCount.fetch_add(2, std::memory_order_relaxed);
}
}
return _RepPtrAndBits(rep, isCounted);
} else {
// No entry present, add a new entry.
// No entry present, add a new entry. First check if we should
// clean house.
if (set.insertsUntilSweepCheck == 0) {
// If this insert would cause a rehash, clean house first.
// Remove any reps that have refcount zero. This is safe to do
// since we hold a lock on the set's mutex: no other thread can
// observe or manipulate this set concurrently.
if ((float(set.reps.size() + 1) /
float(set.reps.bucket_count()) >
set.reps.max_load_factor())) {
// Walk the set and erase any non-immortal reps with no
// outstanding refcounts (_refCount == 1). Recall that the
// low-order bit indicates counted (non-immortal) reps, we
// count references by 2.
for (auto iter = set.reps.begin(),
end = set.reps.end(); iter != end; ) {
if (iter->_refCount == 1) {
set.reps.erase(iter++);
}
else {
++iter;
}
}
// Reset insertsUntilSweepCheck. We want to wait at least a
// bit to avoid bad patterns. For example if we sweep the
// table and find one garbage token, we will remove it and
// then insert the new token. On the next insert we'll
// sweep for garbage again. If the calling code is
// consuming a list of tokens, discarding one and creating a
// new one on each iteration, we could end up sweeping the
// whole table like this on every insertion. By waiting a
// bit we avoid these situations at the cost of the
// occasional "unnecessary" rehash.
set.insertsUntilSweepCheck = std::max(
_MinInsertsUntilSweepCheck,
static_cast<size_t>(
set.reps.bucket_count() *
float(set.reps.max_load_factor() -
set.reps.load_factor())));
}
}
else {
--set.insertsUntilSweepCheck;
}

// Create the new entry.
TfAutoMallocTag noname("TfToken");
_RepPtr rep = &(*_sets[setNum].insert(TfToken::_Rep(s)).first);
rep->_isCounted = !makeImmortal;
rep->_setNum = setNum;
rep->_compareCode = _ComputeCompareCode(rep->_cstr);
if (!makeImmortal)
rep->_refCount = 1;
uint64_t compareCode = _ComputeCompareCode(_CStr(s));
_RepPtr rep = &(*set.reps.insert(
TfToken::_Rep(s, setNum, compareCode)).first);
// 3, because 1 for counted bit plus 2 for one refcount. We can
// store relaxed here since our lock on the set's mutex provides
// synchronization.
rep->_refCount.store(makeImmortal ? 0 : 3,
std::memory_order_relaxed);
return _RepPtrAndBits(rep, !makeImmortal);
}
}

template <class Str>
_RepPtrAndBits _FindPtrImpl(Str s) const {
if (_IsEmpty(s))
if (_IsEmpty(s)) {
return _RepPtrAndBits();
}

size_t setNum = _GetSetNum(_CStr(s));
_Set const &set = _sets[setNum];

tbb::spin_mutex::scoped_lock lock(_locks[setNum].val);
tbb::spin_mutex::scoped_lock lock(set.mutex);

_RepSet::const_iterator iter =
_sets[setNum].find(_LookupRep(_CStr(s)));
if (iter == _sets[setNum].end())
_RepSet::const_iterator iter = set.reps.find(_LookupRep(_CStr(s)));
if (iter == set.reps.end()) {
return _RepPtrAndBits();
}

TfToken::_Rep const *rep = &(*iter);

unsigned oldVal =
rep->_refCount.fetch_add(2, std::memory_order_relaxed);

return _RepPtrAndBits(&(*iter), iter->IncrementIfCounted());
return _RepPtrAndBits(rep, oldVal & 1);
}

};
Expand All @@ -272,12 +307,6 @@ TF_REGISTRY_FUNCTION(TfType)
.Alias( TfType::GetRoot(), "vector<TfToken>" );
}

void
TfToken::_PossiblyDestroyRep() const
{
Tf_TokenRegistry::_GetInstance()._PossiblyDestroyRep(_rep.Get());
}

string const&
TfToken::_GetEmptyString()
{
Expand Down
Loading

0 comments on commit 66fcf72

Please sign in to comment.