Skip to content

Commit

Permalink
Put arena cleanup nodes on a separate chunked list instead of at the …
Browse files Browse the repository at this point in the history
…ends of arena blocks.

The motivation is (a) better data locality during SerialArena::CleanupList and (b) simplification of arena layout.

PiperOrigin-RevId: 631173641
  • Loading branch information
protobuf-github-bot authored and copybara-github committed May 6, 2024
1 parent 24f27c3 commit f70d90b
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 182 deletions.
1 change: 1 addition & 0 deletions src/google/protobuf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ cc_test(
}),
deps = [
":arena",
":arena_cleanup",
":cc_test_protos",
":lite_test_util",
":port",
Expand Down
191 changes: 119 additions & 72 deletions src/google/protobuf/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,43 @@ ArenaBlock* SentryArenaBlock() {
}
#endif

SizedPtr AllocateMemory(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
inline size_t AllocationSize(size_t last_size, size_t start_size,
size_t max_size) {
if (last_size == 0) return start_size;
// Double the current block size, up to a limit.
return std::min(2 * last_size, max_size);
}

SizedPtr AllocateMemory(const AllocationPolicy& policy, size_t size) {
if (policy.block_alloc == nullptr) {
return AllocateAtLeast(size);
}
return {policy.block_alloc(size), size};
}

SizedPtr AllocateBlock(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
size_t size;
if (last_size != 0) {
// Double the current block size, up to a limit.
auto max_size = policy.max_block_size;
size = std::min(2 * last_size, max_size);
} else {
size = policy.start_block_size;
}
size_t size =
AllocationSize(last_size, policy.start_block_size, policy.max_block_size);
// Verify that min_bytes + kBlockHeaderSize won't overflow.
ABSL_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() -
SerialArena::kBlockHeaderSize);
size = std::max(size, SerialArena::kBlockHeaderSize + min_bytes);

if (policy.block_alloc == nullptr) {
return AllocateAtLeast(size);
}
return {policy.block_alloc(size), size};
return AllocateMemory(policy, size);
}

SizedPtr AllocateCleanupChunk(const AllocationPolicy* policy_ptr,
size_t last_size) {
constexpr size_t kStartSize = 64;
constexpr size_t kMaxSize = 4 << 10;
static_assert(kStartSize % sizeof(cleanup::CleanupNode) == 0, "");

const size_t size = AllocationSize(last_size, kStartSize, kMaxSize);
if (policy_ptr == nullptr) return AllocateAtLeast(size);
return AllocateMemory(*policy_ptr, size);
}

class GetDeallocator {
Expand All @@ -102,14 +118,95 @@ class GetDeallocator {

} // namespace

namespace cleanup {
struct ChunkList::Chunk {
CleanupNode* First() { return reinterpret_cast<CleanupNode*>(this + 1); }
CleanupNode* Last() { return First() + Capacity() - 1; }
static size_t Capacity(size_t size) {
return (size - sizeof(Chunk)) / sizeof(CleanupNode);
}
size_t Capacity() const { return Capacity(size); }

Chunk* next;
size_t size;
// Cleanup nodes follow.
};

void ChunkList::AddFallback(void* elem, void (*destructor)(void*),
SerialArena& arena) {
ABSL_DCHECK_EQ(next_, limit_);
SizedPtr mem = AllocateCleanupChunk(arena.parent_.AllocPolicy(),
head_ == nullptr ? 0 : head_->size);
arena.AddSpaceAllocated(mem.n);
head_ = new (mem.p) Chunk{head_, mem.n};
next_ = head_->First();
prefetch_ptr_ = reinterpret_cast<char*>(next_);
limit_ = next_ + Chunk::Capacity(mem.n);
AddFromExisting(elem, destructor);
}

void ChunkList::Cleanup(const SerialArena& arena) {
Chunk* c = head_;
if (c == nullptr) return;
GetDeallocator deallocator(arena.parent_.AllocPolicy());

// Iterate backwards in order to destroy in the right order.
CleanupNode* it = next_ - 1;
while (true) {
CleanupNode* first = c->First();
// A prefetch distance of 8 here was chosen arbitrarily.
constexpr int kPrefetchDistance = 8;
CleanupNode* prefetch = it;
// Prefetch the first kPrefetchDistance nodes.
for (int i = 0; prefetch >= first && i < kPrefetchDistance;
--prefetch, ++i) {
prefetch->Prefetch();
}
// For the middle nodes, run destructor and prefetch the node
// kPrefetchDistance after the current one.
for (; prefetch >= first; --it, --prefetch) {
it->Destroy();
prefetch->Prefetch();
}
// Note: we could consider prefetching `next` chunk earlier.
absl::PrefetchToLocalCacheNta(c->next);
// Destroy the rest without prefetching.
for (; it >= first; --it) {
it->Destroy();
}
Chunk* next = c->next;
deallocator({c, c->size});
if (next == nullptr) return;
c = next;
it = c->Last();
};
}

std::vector<void*> ChunkList::PeekForTesting() {
std::vector<void*> ret;
Chunk* c = head_;
if (c == nullptr) return ret;
// Iterate backwards to match destruction order.
CleanupNode* it = next_ - 1;
while (true) {
CleanupNode* first = c->First();
for (; it >= first; --it) {
ret.push_back(it->elem);
}
c = c->next;
if (c == nullptr) return ret;
it = c->Last();
};
}
} // namespace cleanup

// It is guaranteed that this is constructed in `b`. IOW, this is not the first
// arena and `b` cannot be sentry.
SerialArena::SerialArena(ArenaBlock* b, ThreadSafeArena& parent)
: ptr_{b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)},
limit_{b->Limit()},
prefetch_ptr_(
b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)),
prefetch_limit_(b->Limit()),
head_{b},
space_allocated_{b->size},
parent_{parent} {
Expand All @@ -130,22 +227,7 @@ SerialArena::SerialArena(FirstSerialArena, ArenaBlock* b,
}

std::vector<void*> SerialArena::PeekCleanupListForTesting() {
std::vector<void*> res;

ArenaBlock* b = head();
if (b->IsSentry()) return res;

const auto peek_list = [&](char* pos, char* end) {
for (; pos != end; pos += cleanup::Size()) {
cleanup::PeekNode(pos, res);
}
};

peek_list(limit_, b->Limit());
for (b = b->next; b; b = b->next) {
peek_list(reinterpret_cast<char*>(b->cleanup_nodes), b->Limit());
}
return res;
return cleanup_list_.PeekForTesting();
}

std::vector<void*> ThreadSafeArena::PeekCleanupListForTesting() {
Expand Down Expand Up @@ -223,25 +305,16 @@ void* SerialArena::AllocateFromStringBlockFallback() {
PROTOBUF_NOINLINE
void* SerialArena::AllocateAlignedWithCleanupFallback(
size_t n, size_t align, void (*destructor)(void*)) {
size_t required = AlignUpTo(n, align) + cleanup::Size();
size_t required = AlignUpTo(n, align);
AllocateNewBlock(required);
return AllocateAlignedWithCleanup(n, align, destructor);
}

PROTOBUF_NOINLINE
void SerialArena::AddCleanupFallback(void* elem, void (*destructor)(void*)) {
AllocateNewBlock(cleanup::Size());
AddCleanupFromExisting(elem, destructor);
}

void SerialArena::AllocateNewBlock(size_t n) {
size_t used = 0;
size_t wasted = 0;
ArenaBlock* old_head = head();
if (!old_head->IsSentry()) {
// Sync limit to block
old_head->cleanup_nodes = limit_;

// Record how much used in this block.
used = static_cast<size_t>(ptr() - old_head->Pointer(kBlockHeaderSize));
wasted = old_head->size - used - kBlockHeaderSize;
Expand All @@ -253,7 +326,7 @@ void SerialArena::AllocateNewBlock(size_t n) {
// but with a CPU regression. The regression might have been an artifact of
// the microbenchmark.

auto mem = AllocateMemory(parent_.AllocPolicy(), old_head->size, n);
auto mem = AllocateBlock(parent_.AllocPolicy(), old_head->size, n);
AddSpaceAllocated(mem.n);
ThreadSafeArenaStats::RecordAllocateStats(parent_.arena_stats_.MutableStats(),
/*used=*/used,
Expand Down Expand Up @@ -314,34 +387,6 @@ size_t SerialArena::FreeStringBlocks(StringBlock* string_block,
return deallocated;
}

void SerialArena::CleanupList() {
ArenaBlock* b = head();
if (b->IsSentry()) return;

b->cleanup_nodes = limit_;
do {
char* limit = b->Limit();
char* it = reinterpret_cast<char*>(b->cleanup_nodes);
ABSL_DCHECK(!b->IsSentry() || it == limit);
// A prefetch distance of 8 here was chosen arbitrarily.
char* prefetch = it;
int prefetch_dist = 8;
for (; prefetch < limit && --prefetch_dist; prefetch += cleanup::Size()) {
cleanup::PrefetchNode(prefetch);
}
for (; prefetch < limit;
it += cleanup::Size(), prefetch += cleanup::Size()) {
cleanup::DestroyNode(it);
cleanup::PrefetchNode(prefetch);
}
absl::PrefetchToLocalCacheNta(b->next);
for (; it < limit; it += cleanup::Size()) {
cleanup::DestroyNode(it);
}
b = b->next;
} while (b);
}

// Stores arrays of void* and SerialArena* instead of linked list of
// SerialArena* to speed up traversing all SerialArena. The cost of walk is non
// trivial when there are many nodes. Separately storing "ids" minimizes cache
Expand Down Expand Up @@ -550,7 +595,7 @@ ArenaBlock* ThreadSafeArena::FirstBlock(void* buf, size_t size,

SizedPtr mem;
if (buf == nullptr || size < kBlockHeaderSize + kAllocPolicySize) {
mem = AllocateMemory(&policy, 0, kAllocPolicySize);
mem = AllocateBlock(&policy, 0, kAllocPolicySize);
} else {
mem = {buf, size};
// Record user-owned block.
Expand Down Expand Up @@ -734,6 +779,8 @@ uint64_t ThreadSafeArena::Reset() {
// Have to do this in a first pass, because some of the destructors might
// refer to memory in other blocks.
CleanupList();
// Reset the first arena's cleanup list.
first_arena_.cleanup_list_ = cleanup::ChunkList();

// Discard all blocks except the first one. Whether it is user-provided or
// allocated, always reuse the first block for the first arena.
Expand Down Expand Up @@ -913,7 +960,7 @@ SerialArena* ThreadSafeArena::GetSerialArenaFallback(size_t n) {
// have any blocks yet. So we'll allocate its first block now. It must be
// big enough to host SerialArena and the pending request.
serial = SerialArena::New(
AllocateMemory(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);
AllocateBlock(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);

AddSerialArena(id, serial);
}
Expand Down
86 changes: 53 additions & 33 deletions src/google/protobuf/arena_cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
namespace google {
namespace protobuf {
namespace internal {

class SerialArena;

namespace cleanup {

// Helper function invoking the destructor of `object`
Expand All @@ -33,44 +36,61 @@ void arena_destruct_object(void* object) {
// destroyed, and the function to destroy it (`destructor`)
// elem must be aligned at minimum on a 4 byte boundary.
struct CleanupNode {
// Optimization: performs a prefetch on the elem for the cleanup node. We
// explicitly use NTA prefetch here to avoid polluting remote caches: we are
// destroying these instances, there is no purpose for these cache lines to
// linger around in remote caches.
ABSL_ATTRIBUTE_ALWAYS_INLINE void Prefetch() {
// TODO: we should also prefetch the destructor code once
// processors support code prefetching.
absl::PrefetchToLocalCacheNta(elem);
}

// Destroys the object referenced by the cleanup node.
ABSL_ATTRIBUTE_ALWAYS_INLINE void Destroy() { destructor(elem); }

void* elem;
void (*destructor)(void*);
};

inline ABSL_ATTRIBUTE_ALWAYS_INLINE CleanupNode* ToCleanup(void* pos) {
return reinterpret_cast<CleanupNode*>(pos);
}

// Adds a cleanup entry at memory location `pos`.
inline ABSL_ATTRIBUTE_ALWAYS_INLINE void CreateNode(void* pos, void* elem,
// Manages the list of cleanup nodes in a chunked linked list. Chunks grow by
// factors of two up to a limit. Trivially destructible, but Cleanup() must be
// called before destruction.
class ChunkList {
public:
PROTOBUF_ALWAYS_INLINE void Add(void* elem, void (*destructor)(void*),
SerialArena& arena) {
if (PROTOBUF_PREDICT_TRUE(next_ < limit_)) {
AddFromExisting(elem, destructor);
return;
}
AddFallback(elem, destructor, arena);
}

// Runs all inserted cleanups and frees allocated chunks. Must be called
// before destruction.
void Cleanup(const SerialArena& arena);

private:
struct Chunk;
friend class internal::SerialArena;

void AddFallback(void* elem, void (*destructor)(void*), SerialArena& arena);
ABSL_ATTRIBUTE_ALWAYS_INLINE void AddFromExisting(void* elem,
void (*destructor)(void*)) {
CleanupNode n = {elem, destructor};
memcpy(pos, &n, sizeof(n));
}

// Optimization: performs a prefetch on the elem for the cleanup node at `pos`.
inline ABSL_ATTRIBUTE_ALWAYS_INLINE void PrefetchNode(void* pos) {
// We explicitly use NTA prefetch here to avoid polluting remote caches: we
// are destroying these instances, there is no purpose for these cache lines
// to linger around in remote caches.
absl::PrefetchToLocalCacheNta(ToCleanup(pos)->elem);
}

// Destroys the object referenced by the cleanup node.
inline ABSL_ATTRIBUTE_ALWAYS_INLINE void DestroyNode(void* pos) {
CleanupNode* cleanup = ToCleanup(pos);
cleanup->destructor(cleanup->elem);
}

// Append in `out` the pointer to the to-be-cleaned object in `pos`.
inline void PeekNode(void* pos, std::vector<void*>& out) {
out.push_back(ToCleanup(pos)->elem);
}

// Returns the required size for a cleanup node.
constexpr ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size() {
return sizeof(CleanupNode);
}
*next_++ = CleanupNode{elem, destructor};
}

// Returns the pointers to the to-be-cleaned objects.
std::vector<void*> PeekForTesting();

Chunk* head_ = nullptr;
CleanupNode* next_ = nullptr;
CleanupNode* limit_ = nullptr;
// Current prefetch position. Data from `next_` up to but not including
// `prefetch_ptr_` is software prefetched. Used in SerialArena prefetching.
const char* prefetch_ptr_ = nullptr;
};

} // namespace cleanup
} // namespace internal
Expand Down
Loading

0 comments on commit f70d90b

Please sign in to comment.