-
Notifications
You must be signed in to change notification settings - Fork 43
/
mergeStats.h
303 lines (283 loc) · 12 KB
/
mergeStats.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/*
* mergeStats.h
*
* Copyright 2010-2012 Yahoo! Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Created on: Apr 27, 2010
* Author: sears
*/
#ifndef MERGESTATS_H_
#define MERGESTATS_H_
#include <stasis/common.h>
#define EXTENDED_STATS 1
#include <sys/time.h>
#include <stdio.h>
#include "dataTuple.h"
#include "dataPage.h"
#include <mergeManager.h> // XXX for double_to_ts, etc... create a util class.
#include <stasis/transactional.h>
class mergeStats {
private:
void init_helper(void) {
#if EXTENDED_STATS
gettimeofday(&stats_sleep,0);
gettimeofday(&stats_start,0);
gettimeofday(&stats_done,0);
struct timeval last;
gettimeofday(&last,0);
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
pageid_t rb_size_estimator(pageid_t num_bytes, pageid_t num_tuples) {
// Experimentally determined numbers
// pageid_t small_tup_est = num_bytes + 110L * num_tuples;
// pageid_t big_tup_est = num_bytes + (2L * num_bytes) / 100L;
// Pessimistic numbers
pageid_t small_tup_est = num_bytes + 220L * num_tuples;
pageid_t big_tup_est = num_bytes + (4L * num_bytes) / 100L;
return big_tup_est > small_tup_est ? big_tup_est : small_tup_est; // max
}
public:
mergeStats(int merge_level, int64_t target_size) :
merge_level(merge_level),
base_size(0),
mergeable_size(0),
target_size(target_size),
bytes_out(0),
bytes_in_small(0),
bytes_in_large(0),
num_tuples_out(0),
num_tuples_in_small(0),
num_tuples_in_large(0),
just_handed_off(false),
delta(0),
need_tick(0),
in_progress(0),
out_progress(0),
active(false)
#if EXTENDED_STATS
,
stats_merge_count(0),
stats_bytes_out_with_overhead(0),
stats_num_datapages_out(0),
stats_bytes_in_small_delta(0),
stats_lifetime_elapsed(0),
stats_lifetime_active(0),
stats_elapsed(0),
stats_active(0),
stats_lifetime_consumed(0),
stats_bps(10.0*1024.0*1024.0)
#endif // EXTENDED_STATS
{
init_helper();
}
mergeStats(int xid, recordid rid) {
marshalled_header h;
Tread(xid, rid, &h);
merge_level = h.merge_level;
base_size = h.base_size;
mergeable_size = h.mergeable_size;
target_size = h.target_size;
bytes_out = base_size;
bytes_in_small = 0;
bytes_in_large = 0;
num_tuples_out = 0;
num_tuples_in_small = 0;
num_tuples_in_large = 0;
just_handed_off= false;
delta = 0;
need_tick = 0;
in_progress = 0;
out_progress = ((double)base_size) / (double)target_size;
active = false;
#if EXTENDED_STATS
stats_merge_count = 0;
stats_bytes_out_with_overhead = 0;
stats_num_datapages_out = 0;
stats_bytes_in_small_delta = 0;
stats_lifetime_elapsed = 0;
stats_lifetime_active = 0;
stats_elapsed = 0;
stats_active = 0;
stats_lifetime_consumed = 0;
stats_bps = 10.0*1024.0*1024.0;
#endif
init_helper();
}
recordid talloc(int xid) {
return Talloc(xid, sizeof(marshalled_header));
}
void marshal(int xid, recordid rid) {
marshalled_header h;
h.merge_level = merge_level;
h.base_size = base_size;
h.mergeable_size = mergeable_size;
h.target_size = h.target_size;
Tset(xid, rid, &h);
}
~mergeStats() { }
void new_merge2() {
if(just_handed_off) {
bytes_out = 0;
out_progress = 0;
just_handed_off = false;
}
base_size = bytes_out;
bytes_out = 0;
bytes_in_small = 0;
bytes_in_large = 0;
num_tuples_out = 0;
num_tuples_in_small = 0;
num_tuples_in_large = 0;
in_progress = 0;
#if EXTENDED_STATS
stats_merge_count++;
stats_bytes_out_with_overhead = 0;
stats_num_datapages_out = 0;
stats_bytes_in_small_delta = 0;
#endif
}
void starting_merge() {
active = true;
#if EXTENDED_STATS
gettimeofday(&stats_start, 0);
struct timeval last;
gettimeofday(&last, 0);
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
pageid_t get_current_size() {
if(merge_level == 0) {
return rb_size_estimator(base_size + bytes_in_small - bytes_in_large - bytes_out,
/*num_tuples_base + */ num_tuples_in_small - num_tuples_in_large - num_tuples_out);;
} else {
// s->bytes_out has strange semantics. It's how many bytes our input has written into this tree.
return base_size + bytes_out - bytes_in_large;
}
}
void handed_off_tree() {
if(merge_level == 2) {
} else {
mergeable_size = get_current_size();
just_handed_off = true;
}
}
void merged_tuples(dataTuple * merged, dataTuple * small, dataTuple * large) {
}
void wrote_datapage(dataPage *dp) {
#if EXTENDED_STATS
stats_num_datapages_out++;
stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
#endif
}
pageid_t output_size() {
return bytes_out;
}
protected:
double float_tv(struct timeval& tv) {
return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0;
}
friend class mergeManager;
protected:
struct marshalled_header {
int merge_level;
pageid_t base_size;
pageid_t mergeable_size;
pageid_t target_size; // Needed?
};
public: // XXX eliminate public fields; these are still required because various bits of calculation (bloom filter size, estimated c0 run length, etc...) are managed outside of mergeManager.
int merge_level; /// The tree component / merge level that we're tracking. 1 => C0->C1, 2 => C1->C2
pageid_t base_size; /// size of existing tree component (c[merge_level]') at beginning of current merge.
protected:
pageid_t mergeable_size; /// The size of c[merge_level]_mergeable, assuming it exists. Protected by mutex.
public:
pageid_t target_size; /// How big should the c[merge_level] tree component be?
protected:
pageid_t bytes_out; /// For C0, number of bytes consumed by downstream merger. For merge_level 1 and 2, number of bytes enqueued for the downstream (C1-C2, and nil) mergers.
public:
pageid_t bytes_in_small; /// For C0, number of bytes inserted by application. For C1, C2, number of bytes read from small tree in C(n-1) - Cn merger.
protected:
pageid_t bytes_in_large; /// Bytes from the large input? (for C0, bytes deleted due to updates)
pageid_t num_tuples_out; /// How many tuples did we write? TODO Only used for C0, so not stored on disk.
pageid_t num_tuples_in_small; /// Tuples from the small input? TODO Only used for C0, so not stored on disk.
pageid_t num_tuples_in_large; /// Tuples from large input? TODO Only used for C0, so not stored on disk.
// todo: simplify confusing hand off logic, and remove this field?
bool just_handed_off;
// These fields are used to amortize mutex acquisitions.
int delta;
int need_tick;
// todo in_progress and out_progress are derived fields. eliminate them?
double in_progress;
double out_progress;
bool active; /// True if this merger is running, or blocked by rate limiting. False if the upstream input does not exist.
#if EXTENDED_STATS
pageid_t stats_merge_count; /// This is the stats_merge_count'th merge
struct timeval stats_sleep; /// When did we go to sleep waiting for input?
struct timeval stats_start; /// When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep)
struct timeval stats_done; /// When did we finish merging?
struct timespec stats_last_tick;
pageid_t stats_bytes_out_with_overhead;/// How many bytes did we write (including internal tree nodes)?
pageid_t stats_num_datapages_out; /// How many datapages?
pageid_t stats_bytes_in_small_delta; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
double stats_lifetime_elapsed; /// How long has this tree existed, in seconds?
double stats_lifetime_active; /// How long has this tree been running (i.e.; active = true), in seconds?
double stats_elapsed; /// How long did this merge take, including idle time (not valid until after merge is complete)?
double stats_active; /// How long did this merge take once it started running?
double stats_lifetime_consumed; /// How many bytes has this tree consumed from upstream mergers?
double stats_bps; /// Effective throughput while active.
#endif
public:
void pretty_print(FILE* fd) {
#if EXTENDED_STATS
double sleep_time = stats_elapsed - stats_active;
double work_time = stats_active;
double total_time = stats_elapsed;
double mb_out = ((double)bytes_out) /(1024.0*1024.0);
double phys_mb_out = ((double)stats_bytes_out_with_overhead) / (1024.0 * 1024.0);
double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0);
double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0);
double kt_out = ((double)num_tuples_out) /(1024.0);
double kt_ins= ((double)num_tuples_in_small) /(1024.0);
double kt_inl = ((double)num_tuples_in_large) /(1024.0);
double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins);
double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins);
fprintf(fd,
"=====================================================================\n"
"Thread %d merge %lld: sleep %6.2f sec, run %6.2f sec\n"
" megabytes kTuples datapages MB/s (real) kTup/s (real)\n"
"Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
".....................................................................\n"
"avg tuple len: %6.2fKB w/ disk ovehead: %6.2fKB\n"
"effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n"
" (%.2f; %.2f) wallclock" "\n"
".....................................................................\n"
,
merge_level, stats_merge_count,
sleep_time,
work_time,
(long long)mb_out, (long long)kt_out, stats_num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time,
(long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time,
(long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time,
(long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time,
mb_out / kt_out, phys_mb_out / kt_out,
mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins
);
#endif
}
};
#endif /* MERGESTATS_H_ */