-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite_buffer_manager.h
176 lines (145 loc) · 5.73 KB
/
write_buffer_manager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBufferManager is for managing memory allocation for one or more
// MemTables.
#pragma once
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <list>
#include <mutex>
#include "rocksdb/cache.h"
namespace ROCKSDB_NAMESPACE {
class CacheReservationManager;
// Interface to block and signal DB instances, intended for RocksDB
// internal use only. Each DB instance contains ptr to StallInterface.
class StallInterface {
public:
virtual ~StallInterface() {}
virtual void Block() = 0;
virtual void Signal() = 0;
};
class WriteBufferManager final {
public:
// Parameters:
// _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
// memory_usage() won't be valid and ShouldFlush() will always return true.
//
// cache_: if `cache` is provided, we'll put dummy entries in the cache and
// cost the memory allocated to the cache. It can be used even if _buffer_size
// = 0.
//
// allow_stall: if set true, it will enable stalling of writes when
// memory_usage() exceeds buffer_size. It will wait for flush to complete and
// memory usage to drop down.
explicit WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache = {},
bool allow_stall = false);
// No copying allowed
WriteBufferManager(const WriteBufferManager&) = delete;
WriteBufferManager& operator=(const WriteBufferManager&) = delete;
~WriteBufferManager();
// Returns true if buffer_limit is passed to limit the total memory usage and
// is greater than 0.
bool enabled() const { return buffer_size() > 0; }
// Returns true if pointer to cache is passed.
bool cost_to_cache() const { return cache_res_mgr_ != nullptr; }
// Returns the total memory used by memtables.
// Only valid if enabled()
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}
// Returns the total memory used by active memtables.
size_t mutable_memtable_memory_usage() const {
return memory_active_.load(std::memory_order_relaxed);
}
size_t dummy_entries_in_cache_usage() const;
// Returns the buffer_size.
size_t buffer_size() const {
return buffer_size_.load(std::memory_order_relaxed);
}
void SetBufferSize(size_t new_size) {
buffer_size_.store(new_size, std::memory_order_relaxed);
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
// Check if stall is active and can be ended.
MaybeEndWriteStall();
}
// Below functions should be called by RocksDB internally.
// Should only be called from write thread
bool ShouldFlush() const {
if (enabled()) {
if (mutable_memtable_memory_usage() >
mutable_limit_.load(std::memory_order_relaxed)) {
return true;
}
size_t local_size = buffer_size();
if (memory_usage() >= local_size &&
mutable_memtable_memory_usage() >= local_size / 2) {
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
return true;
}
}
return false;
}
// Returns true if total memory usage exceeded buffer_size.
// We stall the writes untill memory_usage drops below buffer_size. When the
// function returns true, all writer threads (including one checking this
// condition) across all DBs will be stalled. Stall is allowed only if user
// pass allow_stall = true during WriteBufferManager instance creation.
//
// Should only be called by RocksDB internally .
bool ShouldStall() const {
if (!allow_stall_ || !enabled()) {
return false;
}
return IsStallActive() || IsStallThresholdExceeded();
}
// Returns true if stall is active.
bool IsStallActive() const {
return stall_active_.load(std::memory_order_relaxed);
}
// Returns true if stalling condition is met.
bool IsStallThresholdExceeded() const {
return memory_usage() >= buffer_size_;
}
void ReserveMem(size_t mem);
// We are in the process of freeing `mem` bytes, so it is not considered
// when checking the soft limit.
void ScheduleFreeMem(size_t mem);
void FreeMem(size_t mem);
// Add the DB instance to the queue and block the DB.
// Should only be called by RocksDB internally.
void BeginWriteStall(StallInterface* wbm_stall);
// If stall conditions have resolved, remove DB instances from queue and
// signal them to continue.
void MaybeEndWriteStall();
void RemoveDBFromQueue(StallInterface* wbm_stall);
private:
std::atomic<size_t> buffer_size_;
std::atomic<size_t> mutable_limit_;
std::atomic<size_t> memory_used_;
// Memory that hasn't been scheduled to free.
std::atomic<size_t> memory_active_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
// Protects cache_res_mgr_
std::mutex cache_res_mgr_mu_;
std::list<StallInterface*> queue_;
// Protects the queue_ and stall_active_.
std::mutex mu_;
bool allow_stall_;
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
// while holding mu_, but it can be read without a lock.
std::atomic<bool> stall_active_;
void ReserveMemWithCache(size_t mem);
void FreeMemWithCache(size_t mem);
};
} // namespace ROCKSDB_NAMESPACE