Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically adjust search threadpool queue_size #23884

Merged
merged 1 commit into from
May 16, 2017

Conversation

dakrone
Copy link
Member

@dakrone dakrone commented Apr 3, 2017

This PR adds a new thread pool type: fixed_auto_queue_size. This thread pool
behaves like a regular fixed threadpool, except that every
auto_queue_frame_size operations (default: 10,000) in the thread pool,
Little's Law is calculated and
used to adjust the pool's queue_size either up or down by 25. A minimum and
maximum is taken into account also.

The SEARCH threadpool is changed to use this new type of thread pool.

Relates to #3890

@dakrone
Copy link
Member Author

dakrone commented Apr 3, 2017

I think there is more testing I can add to this (there are already unit tests), and I need to benchmark the difference between this and normal execution, but I wanted to get comments on the implementation earlier rather than later.

@clintongormley
Copy link
Contributor

Out of interest, why did you choose 10,000 and 25? My gut feeling would be to adjust more frequently and by a smaller delta, but that's a complete guess on my part.

@dakrone
Copy link
Member Author

dakrone commented Apr 3, 2017

Out of interest, why did you choose 10,000 and 25?

For the window size (10,000), I wanted a large enough window that a single operation didn't skew the adjustment too much, since there are a very large number of search operations happening for each search on the search threadpool (a task for each shard, plus the coordinating task) the number should be reached fairly quickly (the "should" is another reason why it's configurable)

For the adjustment size, I picked it arbitrarily, the current values for the search threadpool are: initial_size: 500, min: 10, max: 1000, I wanted the adjustments to be more gradual, but I don't have any real numbers around it other than 25 seemed like a nice number. I'm happy to change it :)

@jasontedor jasontedor self-requested a review April 4, 2017 01:08
@dakrone
Copy link
Member Author

dakrone commented Apr 6, 2017

Here are some benchmarks run between master and the auto-queue-size branch:

Lap Metric Operation Master Auto-queue Difference (Autoqueue - Master) Unit
All Indexing time 68.0937 68.5317 0.438 min
All Merge time 41.5077 42.9774 1.4697 min
All Refresh time 4.8575 4.99223 0.13473 min
All Flush time 2.33533 2.1765 -0.15883 min
All Merge throttle time 23.3773 25.7603 2.383 min
All Median CPU usage 29 105.9 76.9 %
All Total Young Gen GC 14.502 15.264 0.762 s
All Total Old Gen GC 6.085 5.925 -0.16 s
All Index size 20.4374 20.4629 0.0255 GB
All Totally written 112.509 111.564 -0.945 GB
All Heap used for segments 21.8544 22.6236 0.7692 MB
All Heap used for doc values 0.109623 0.140736 0.031113 MB
All Heap used for terms 19.8273 20.564 0.7367 MB
All Heap used for norms 0.039978 0.0393677 -0.0006103 MB
All Heap used for points 0.00909424 0.00926304 0.0001688 MB
All Heap used for stored fields 1.86841 1.87019 0.00178 MB
All Segment count 131 129 -2
All Min Throughput index-append 1586.15 1541.51 -44.64 docs/s
All Median Throughput index-append 1624.31 1570.34 -53.97 docs/s
All Max Throughput index-append 1668.83 1604.92 -63.91 docs/s
All 50.0th percentile latency index-append 2337.08 2408.27 71.19 ms
All 90.0th percentile latency index-append 3589.17 3536.83 -52.34 ms
All 99.0th percentile latency index-append 4491.64 4689.56 197.92 ms
All 100th percentile latency index-append 5063.8 5228.03 164.23 ms
All 50.0th percentile service time index-append 2337.08 2408.27 71.19 ms
All 90.0th percentile service time index-append 3589.17 3536.83 -52.34 ms
All 99.0th percentile service time index-append 4491.64 4689.56 197.92 ms
All 100th percentile service time index-append 5063.8 5228.03 164.23 ms
All error rate index-append 0 0 0 %
All Min Throughput force-merge 0.197267 0.226305 0.029038 ops/s
All Median Throughput force-merge 0.197267 0.226305 0.029038 ops/s
All Max Throughput force-merge 0.197267 0.226305 0.029038 ops/s
All 100th percentile latency force-merge 5069.25 4418.81 -650.44 ms
All 100th percentile service time force-merge 5069.25 4418.81 -650.44 ms
All error rate force-merge 0 0 0 %
All Min Throughput index-stats 50.2843 50.2902 0.0059 ops/s
All Median Throughput index-stats 50.3501 50.3517 0.0016 ops/s
All Max Throughput index-stats 50.4159 50.4132 -0.0027 ops/s
All 50.0th percentile latency index-stats 3.46025 2.93639 -0.52386 ms
All 90.0th percentile latency index-stats 3.97917 3.44849 -0.53068 ms
All 99.0th percentile latency index-stats 4.49356 3.80028 -0.69328 ms
All 100th percentile latency index-stats 9.53015 4.2343 -5.29585 ms
All 50.0th percentile service time index-stats 3.37462 2.85683 -0.51779 ms
All 90.0th percentile service time index-stats 3.89758 3.36224 -0.53534 ms
All 99.0th percentile service time index-stats 4.39869 3.73934 -0.65935 ms
All 100th percentile service time index-stats 9.4488 4.16472 -5.28408 ms
All error rate index-stats 0 0 0 %
All Min Throughput node-stats 50.2722 50.2717 -0.0005 ops/s
All Median Throughput node-stats 50.3502 50.3471 -0.0031 ops/s
All Max Throughput node-stats 50.4281 50.4226 -0.0055 ops/s
All 50.0th percentile latency node-stats 3.36706 3.37268 0.00562 ms
All 90.0th percentile latency node-stats 3.82163 3.81749 -0.00414 ms
All 99.0th percentile latency node-stats 5.78723 4.9849 -0.80233 ms
All 100th percentile latency node-stats 6.83571 5.62278 -1.21293 ms
All 50.0th percentile service time node-stats 3.27721 3.2767 -0.00051 ms
All 90.0th percentile service time node-stats 3.72936 3.72079 -0.00857 ms
All 99.0th percentile service time node-stats 5.72494 4.87053 -0.85441 ms
All 100th percentile service time node-stats 6.73225 5.52148 -1.21077 ms
All error rate node-stats 0 0 0 %
All Min Throughput default 20.0099 20.0108 0.0009 ops/s
All Median Throughput default 20.0148 20.0161 0.0013 ops/s
All Max Throughput default 20.0325 20.0321 -0.0004 ops/s
All 50.0th percentile latency default 13.6392 10.1922 -3.447 ms
All 90.0th percentile latency default 14.257 10.6182 -3.6388 ms
All 99.0th percentile latency default 18.04 12.0561 -5.9839 ms
All 99.9th percentile latency default 19.5035 18.18 -1.3235 ms
All 100th percentile latency default 20.1269 18.2644 -1.8625 ms
All 50.0th percentile service time default 13.5183 10.075 -3.4433 ms
All 90.0th percentile service time default 14.1481 10.4997 -3.6484 ms
All 99.0th percentile service time default 17.9333 11.9286 -6.0047 ms
All 99.9th percentile service time default 19.4008 18.079 -1.3218 ms
All 100th percentile service time default 20.007 18.1255 -1.8815 ms
All error rate default 0 0 0 %
All Min Throughput term 20.0115 20.011 -0.0005 ops/s
All Median Throughput term 20.0172 20.0163 -0.0009 ops/s
All Max Throughput term 20.0338 20.0324 -0.0014 ops/s
All 50.0th percentile latency term 7.92833 9.35908 1.43075 ms
All 90.0th percentile latency term 8.44468 9.78134 1.33666 ms
All 99.0th percentile latency term 9.18559 10.8609 1.67531 ms
All 99.9th percentile latency term 15.018 15.6326 0.6146 ms
All 100th percentile latency term 15.9561 18.3146 2.3585 ms
All 50.0th percentile service time term 7.81303 9.24117 1.42814 ms
All 90.0th percentile service time term 8.33054 9.66676 1.33622 ms
All 99.0th percentile service time term 9.08257 10.7349 1.65233 ms
All 99.9th percentile service time term 14.9163 15.5288 0.6125 ms
All 100th percentile service time term 15.8548 18.2458 2.391 ms
All error rate term 0 0 0 %
All Min Throughput phrase 20.0111 20.0111 0. ops/s
All Median Throughput phrase 20.0169 20.0167 -0.0002 ops/s
All Max Throughput phrase 20.0331 20.0329 -0.0002 ops/s
All 50.0th percentile latency phrase 8.46125 8.65166 0.19041 ms
All 90.0th percentile latency phrase 9.07275 9.08076 0.00801 ms
All 99.0th percentile latency phrase 10.4592 11.426 0.9668 ms
All 99.9th percentile latency phrase 14.9159 14.393 -0.5229 ms
All 100th percentile latency phrase 17.1106 17.1545 0.0439 ms
All 50.0th percentile service time phrase 8.358 8.54007 0.18207 ms
All 90.0th percentile service time phrase 8.95422 8.96014 0.00592 ms
All 99.0th percentile service time phrase 10.3529 11.2965 0.9436 ms
All 99.9th percentile service time phrase 14.777 14.2928 -0.4842 ms
All 100th percentile service time phrase 17.0026 17.0348 0.0322 ms
All error rate phrase 0 0 0 %
All Min Throughput articles_monthly_agg_uncached 20.0079 20.0061 -0.0018 ops/s
All Median Throughput articles_monthly_agg_uncached 20.0108 20.012 0.0012 ops/s
All Max Throughput articles_monthly_agg_uncached 20.0249 20.0255 0.0006 ops/s
All 50.0th percentile latency articles_monthly_agg_uncached 19.0843 17.9412 -1.1431 ms
All 90.0th percentile latency articles_monthly_agg_uncached 26.2466 27.9395 1.6929 ms
All 99.0th percentile latency articles_monthly_agg_uncached 26.9635 28.5009 1.5374 ms
All 99.9th percentile latency articles_monthly_agg_uncached 32.4034 33.7226 1.3192 ms
All 100th percentile latency articles_monthly_agg_uncached 43.9459 34.4305 -9.5154 ms
All 50.0th percentile service time articles_monthly_agg_uncached 18.9682 17.8266 -1.1416 ms
All 90.0th percentile service time articles_monthly_agg_uncached 26.1301 27.8185 1.6884 ms
All 99.0th percentile service time articles_monthly_agg_uncached 26.838 28.3926 1.5546 ms
All 99.9th percentile service time articles_monthly_agg_uncached 32.289 33.5961 1.3071 ms
All 100th percentile service time articles_monthly_agg_uncached 41.6071 34.3379 -7.2692 ms
All error rate articles_monthly_agg_uncached 0 0 0 %
All Min Throughput articles_monthly_agg_cached 20.0128 20.0128 0. ops/s
All Median Throughput articles_monthly_agg_cached 20.0191 20.0191 0. ops/s
All Max Throughput articles_monthly_agg_cached 20.0377 20.0378 0.0001 ops/s
All 50.0th percentile latency articles_monthly_agg_cached 2.74258 2.71849 -0.02409 ms
All 90.0th percentile latency articles_monthly_agg_cached 2.88237 2.83613 -0.04624 ms
All 99.0th percentile latency articles_monthly_agg_cached 3.36458 3.15943 -0.20515 ms
All 99.9th percentile latency articles_monthly_agg_cached 8.57833 9.23708 0.65875 ms
All 100th percentile latency articles_monthly_agg_cached 10.9329 9.97416 -0.95874 ms
All 50.0th percentile service time articles_monthly_agg_cached 2.62435 2.60158 -0.02277 ms
All 90.0th percentile service time articles_monthly_agg_cached 2.76292 2.71849 -0.04443 ms
All 99.0th percentile service time articles_monthly_agg_cached 3.23411 3.0395 -0.19461 ms
All 99.9th percentile service time articles_monthly_agg_cached 8.48554 9.11497 0.62943 ms
All 100th percentile service time articles_monthly_agg_cached 10.8651 9.84902 -1.01608 ms
All error rate articles_monthly_agg_cached 0 0 0 %
All Min Throughput scroll 1.17592 3.10634 1.93042 ops/s
All Median Throughput scroll 1.47814 3.13217 1.65403 ops/s
All Max Throughput scroll 1.53707 3.17197 1.6349 ops/s
All 50.0th percentile latency scroll 1.60029e+06 703378 -896912. ms
All 90.0th percentile latency scroll 2.77393e+06 972316 -1801614. ms
All 99.0th percentile latency scroll 3.01257e+06 1.02698e+06 -1985590. ms
All 100th percentile latency scroll 3.04e+06 1.03323e+06 -2006770. ms
All 50.0th percentile service time scroll 21526.6 7796.78 -13729.82 ms
All 90.0th percentile service time scroll 29557 8334.84 -21222.16 ms
All 99.0th percentile service time scroll 57034.9 8845.59 -48189.31 ms
All 100th percentile service time scroll 57492.6 8876.95 -48615.65 ms
All error rate scroll 0 0 0 %

The query performance looks extremely similar, with very slightly higher latency on the auto-queueing branch, ~1ms for term queries and <1ms for phrase queries.

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass and I really like it so far. There are some structural issues (nitpicks) and a bunch of questions that I have but it looks pretty good IMO. I am not so sure about the adjustsize of 25 but I might be missing something. What I'd love to see is a benchmark where a node is under load and see how we behave with and without the queue impl

int frameSize,
ThreadFactory threadFactory,
ThreadContext contextHolder) {
assert initialQueueCapacity > 0 : "initial queue cacity cannot be less than 0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a hard check please

@@ -78,6 +78,23 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}

public static EsThreadPoolExecutor newAutoQueueFixed(String name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one needs javadocs and you can go back to 140 chars


private long startNs;

QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file can go back to 140 chars as well...

// total runtime, rather than a fixed interval.

// λ = tasks per time period - total tasks divided by measurement time
final double lambda = (double) taskCount / totalRuntime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard to tell what totalRuntime is and if we'd need any error correction here ie. if totalRuntime can grow much bigger than task count. My intuition says no but I'd appreciate some docs that explain the scale of these parameters or what they are (totalRuntime in nanos? millis?)

final double w = totalTaskNanos / taskCount;

// L = λ * W
final int l = (int) (lambda * w);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use Math.toIntExact here when we cast to long first?

final int newCapacity =
Math.min(maxCapacity, Math.max(minCapacity, capacity + finalAdjustment));
this.capacity = newCapacity;
logger.debug("adjusting queue by [{}], new capacity: [{}]", finalAdjustment, this.capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we log this on the consumer level if at all?

}

public long getTotalNanos() {
if (finishTimeNanos == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just fail if this is the case? I mean we should not call it in that case right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with this TimedRunnable is a better generic wrapper, because it allows checking task time before the task is actually complete (returning -1), rather than blowing up, if you think it should be used only for this calculation though, I can change it to a hard error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well then document this? I think it should have a javadoc? Also does it need to be public?

* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
*/
public class TimedRunnable implements Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can this class be final and pkg private? we should not leak it?!


/**
* A builder for executors that automatically adjust the queue length as needed, depending on
* Little's Law.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a wikipedia link or something would be useful.

info.getQueueSize() == null ? "unbounded" : info.getQueueSize());
}

static class AutoExecutorSettings extends ExecutorBuilder.ExecutorSettings {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be final too?

@clintongormley clintongormley added :Internal :Search/Search Search-related issues that do not fall into other categories labels Apr 13, 2017
@dakrone
Copy link
Member Author

dakrone commented Apr 14, 2017

Thanks for the review @s1monw! I've pushed a lot of changes addressing your feedback.

I am not so sure about the adjustsize of 25 but I might be missing something.

You're not missing anything at all, I picked it arbitrarily. I was thinking of a better idea though, how about 1/10th of the difference between min and max? That way no matter how large the difference and current queue size, it only takes 10 adjustments to reach either the min or max?

So with min=10, max=1000, initial=500 the adjustment would be (1000-10)/10 = 99. What do you think?

What I'd love to see is a benchmark where a node is under load and see how we behave with and without the queue impl

Great idea! I can work on this.

@s1monw
Copy link
Contributor

s1monw commented Apr 18, 2017

You're not missing anything at all, I picked it arbitrarily. I was thinking of a better idea though, how about 1/10th of the difference between min and max? That way no matter how large the difference and current queue size, it only takes 10 adjustments to reach either the min or max?

that seems like a good first step.

I also wonder what the default is for the frame_size? I looked but missed it.

@dakrone
Copy link
Member Author

dakrone commented Apr 18, 2017

that seems like a good first step.

Sounds good, I'll work on that!

I also wonder what the default is for the frame_size? I looked but missed it.

The default is 10,000, but this is configurable in the settings

@s1monw
Copy link
Contributor

s1monw commented Apr 18, 2017

The default is 10,000, but this is configurable in the settings

I feel we shouldn't change the default of the search threadpool in this PR. Lets keep it to 1k?

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments is looking good so far! I think you can remove the WIP label?

return this.capacity;
}

int diff = optimalCapacity - this.capacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do this, make sure it has a comment. I am ok with adding a static method that delegates and has an assertion in it with a supressions.

}

public long getTotalNanos() {
if (finishTimeNanos == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well then document this? I think it should have a javadoc? Also does it need to be public?

* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
* based on the time taken for each event.
*/
@TestLogging("_root:DEBUG")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
`500`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we stick to the size we had or why did you change this?

queue_size: 500
min_queue_size: 10
max_queue_size: 1000
auto_queue_frame_size: 10000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is too much by default maybe go with 1k for a start?

@LucaWintergerst
Copy link
Contributor

Can one of you share why we apply this to the Search the search threadpool first and also if we have similar plans for Bulk?

@s1monw
Copy link
Contributor

s1monw commented Apr 19, 2017

Can one of you share why we apply this to the Search the search thread-pool first and also if we have similar plans for Bulk?

@LucaWintergerst to be honest I am leaning towards not even applying it to the search thread-pool in this PR but in a followup. We are just introducing this feature and it's not considered good practice to change more than one variable in a system in order to observe behavioral change. Search is also very different from indexing since on a search request you can retry on a replica, on indexing this is not possible. To make this work well for indexing we also need more benchmarks and infrastructure for this to test so I don't think that this will be applied to indexing by default before we have production feedback from the real word on this feature for search. I hope this makes sense

@dakrone
Copy link
Member Author

dakrone commented Apr 19, 2017

The default is 10,000, but this is configurable in the settings
I feel we shouldn't change the default of the search threadpool in this PR. Lets keep it to 1k?

I think 1k is a little too small (only ~166 queries for a single ES node), I changed the default to 2k, does that work? I also reset the search defaults back to an initial size of 1000 (like it was before) with a min of 10 and max of 2000.

left some comments is looking good so far! I think you can remove the WIP label?

Thanks! I removed the WIP label

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code wise I think we are ready to go. I still want to see benchmarks / tests on how better we do compared to non-autoscaling if a node is slow as well as how quickly it recovers in terms of queue length when it gets healthy again.

a general thought, I wonder if we should make the window size a function of the queue length since with shorter queues we should check more often if we need to make it longer?

@@ -69,6 +68,7 @@
this.startNs = System.nanoTime();
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.queueAdjustmentAmount = (maxQueueSize - minQueueSize) / 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a check that max > min make it a hard check and throw IAE?

* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
*/
class TimedRunnable implements Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it final?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extend TimedRunnable in QueueResizingEsThreadPoolExecutorTests as SettableTimedRunnable so I can substitute the time taken for a task without Thread.sleep, so I can't make this final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

final String maxSizeKey = settingsKey(prefix, "max_queue_size");
final String frameSizeKey = settingsKey(prefix, "auto_queue_frame_size");
this.queueSizeSetting = Setting.intSetting(queueSizeKey, initialQueueSize, Setting.Property.NodeScope);
this.minQueueSizeSetting = Setting.intSetting(minSizeKey, minQueueSize, Setting.Property.NodeScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add custom validators / parser to make sure that min <= max at the settings parsing level. I know they have a cyclic dep. So I think you need to create two instance of each for the validation and for the actual registration, I thinks worth it.

return Math.abs(optimalCapacity - this.capacity);
}

int getCapacity() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a public int capacity() in the super class isn't this enough?

* Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust
* the capacity by a certain amount towards a maximum or minimum.
*/
final class ResizableSizeBlockingQueue<E> extends SizeBlockingQueue<E> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also override public int remainingCapacity() with:

    public int remainingCapacity() {
        return Math.max(0, super.remainingCapacity());
    }

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I reviewed this! I like it. I left a bunch of comments though. Two things I noticed that I'm not sure where to comment:

  • the naming of frame/window is inconsistent, you use both
  • the naming of queue resizing executor, auto queue adjusting executor, and EsExecutors#newAutoQueueFixed are inconsistent with each other, can we get them in sync?

@@ -78,6 +78,29 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}

/**
* Return a new executor that will automatically adjust the queue size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add based on queue throughput.?

public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, ThreadFactory threadFactory,
ThreadContext contextHolder) {
if (initialQueueCapacity < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These always read clearer to me as <= 0.

int maxQueueSize, int frameSize, ThreadFactory threadFactory,
ThreadContext contextHolder) {
if (initialQueueCapacity < 1) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be greater than 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be positive? Include the illegal value?

}
ResizableSizeBlockingQueue<Runnable> queue =
new ResizableSizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
initialQueueCapacity);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this could fit in 140 columns.

private final int tasksPerWindow;
private final int minQueueSize;
private final int maxQueueSize;
/** The amount the queue size is adjusted by for each calcuation */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a regular comment and not a Javadoc.

int maxQueueSize = settings.maxQueueSize;
int frameSize = settings.frameSize;
final ThreadFactory threadFactory =
EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line.

this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
assert executor instanceof QueueResizingEsThreadPoolExecutor :
"this info class is only for queue resizing excutors";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line.

@@ -171,7 +172,10 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
// The search threadpool's queue will automatically be adjusted. It starts initially at 1000
// with a minimum of 10 and maximum of 2000. The default frame size is 2000 operations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate code comments, but I think this one can be dropped. It doesn't communicate anything that the code doesn't already show and it's at risk of becoming inconsistent with the code over time as it happens so frequently that a change is made to the code but comments aren't updated. What I would support if you think this comment is essential is:

builders.put(
                Names.SEARCH,
                new AutoQueueAdjustingExecutorBuilder(
                        settings,
                        Names.SEARCH,
                        searchThreadPoolSize(availableProcessors),
                        1000 /* initial queue size */,
                        10 /* min queue size */,
                        2000 /* max queue size */,
                        2000 /* frame size */));

but even this I think is unnecessary.

// do this at the end so there is no concurrent adjustments happening. We also
// decrement them instead of resetting them back to zero, as
int tasks = taskCount.addAndGet(-this.tasksPerWindow);
assert tasks >= 0 : "tasks should never less than 0";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the offending value? If this breaks, it's going to be in tests in concurrent code where it's possible it will not reproduce easily to any hint as to the problem will be welcome.

* Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust
* the capacity by a certain amount towards a maximum or minimum.
*/
final class ResizableSizeBlockingQueue<E> extends SizeBlockingQueue<E> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can probably drop the second size from the name, make it less verbose.

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left two more comments, sorry, still very much thinking about this one.

assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
if (taskCount.incrementAndGet() == this.tasksPerWindow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced in th soundness of the bookkeeping here. I think that totalNanos can include the total task time for more than tasksPerWindow tasks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so? I am trying to think of how it will do that, but I wasn't able to come up with something, do you have a specific scenario in mind?

// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when `t` is null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced we should ignore failures. These tasks still occupy queue capacity, and there is no guarantee they failed quickly, a thread can be executing for awhile before failing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll remove the null check and use all the runnables as tasks

@dakrone
Copy link
Member Author

dakrone commented Apr 28, 2017

(Heads up for anyone following this PR, I am currently testing this in a real-world scenario and adjusting/changing things accordingly)

@dakrone
Copy link
Member Author

dakrone commented May 2, 2017

Okay, I ran some benchmarks with this change, there are a number of things I found that were interesting. Some that caused me to rethink the design, keep reading below for the update.

I tested this on a 6-node cluster with 5 extremely beefy data nodes (32 CPUs, 64gb RAM, JVM heap at 30gb) and a single client node (to which all the requests were sent). The cluster was loaded with 120 million documents on an index with 5 primaries and 4 replicas (so each node could serve a request for any shard).

I used Pius' setup for JMeter to spawn 5 threads each running 40000 queries at a target rate of 20 queries a second (so with 5 threads it'd be targeted at 100 queries a second).

The query was a fairly complex query with function_score with multiple decay functions, expression scripts, geo_distance, and an extremely large terms filter.

During this test, I consistently stressed a single node by using

stress -i 8 -c 8 -m 8 -d 8

This does the following:

  • spawn 8 workers spinning on sqrt()
  • spawn 8 workers spinning on sync()
  • spawn 8 workers spinning on malloc()/free()
  • spawn 8 workers spinning on write()/unlink()

I did this to simulate a situation where one node in a cluster cannot keep up, whether because of an expensive query, or degraded disk, etc.

The desired behavior

The behavior I'd like to see in this situation is that the stressed node (es5) would eventually lower its threadpool queue down to a level where it would start rejecting requests, causing other nodes in the cluster to serve them. This would lead to better overall performance since the load would be routed away from the overloaded node.

The results with this branch as-is

First I tried this branch as-is, I measured how the threadpools changed and whether the desired behavior was accomplished.

Spoiler alert: It was not.

My initial premise was based on the (deceptively simple) Little's Law formula of

L = λW

Where L is the optimal queue size, λ is the arrival rate of events, and W is the average time it takes to process a request.

I was measuring both λ and W, and calculating L to be used for the queue size.

I discovered a number of issues with this premise. First, by leaving both λ and W as "calculated" metrics, L is reported rather than targeted. This means that on es5, as W (avg response time) got higher due to the node being loaded, the estimated queue actually increased, which was not desired. I went back to the drawing board.

The alternative implementation

"Okay" I thought, I need to make the relationship between the optimal queue size and actual queue size inverse, so that as the "optimal" queue went up, the actual queue size went down. The second thing I tried was to take the maximum queue size and subtract the optimal queue size, so that as the optimal queue size went up (L, also known as the long-term average number of tasks in the queue), the actual queue would go down.

This sort of worked, but it had a number of issues -

It felt hacky, and not like an actual good implementation, being able to set a good maximum queue size would be almost impossible without knowing up front what your queue size already was at, set too high of a max and the adjustment would have no effect, too low and you'd be rejecting before you actually had stress on the node.

Additionally, it suffered the same problem as the previous solution, both λ and W were being measured, when they really should not be, which leads me to..

The alternative-alternative implementation

Back to the drawing board, I spent some time thinking and realized that we should not be measuring both λ and W, and determining L, rather, one variable should be configured, one measured, and one the resulting optimal value.

With this in mind, I implemented an alternative where W was set by the user as a target_response_rate, this is a level where the user would essentially configure a value saying "I am targeting my requests to all take less than N on average for a healthy node".

I then use λ to be the measured task rate, similar to how I did before, this means that with a target_response_rate of 5 seconds and task rate of 15 queries/second, there should be at most 15 * 5 = 75 events in the queue at peak usage.

I tested my theory; with the same setup, I set up a target_response_rate of 5 seconds and adjustment amount of 50 (the queue would go up or down by 50 each time, just makes it easier to read the logs). I knew that this was a particularly complex query that I would be throwing at the nodes, and 5 seconds was a reasonable average response time.

I am happy to report that with my initial testing, I was able to show that with this setup, the stressed node correctly routed traffic away from itself by lowering and subsequently rejecting search requests, routing them to the unstressed nodes, and improving average response times for the entire run.

Here's an overall graph of the response times with "None" and "Auto" queue adjustment:

jmeter-avg

I also captured the handled tasks per second for an unstressed and stressed node as well as the queue adjustment for each node, which you can check out the graphs below:

es1-handled
es5-handled

And you can see the actual versus "optimal" queue in these two graphs:

  • ES1 (un-stressed)
    es1-queue

  • ES5 (stressed)
    es5-queue

For more information as well as the raw data (this explanation is already long enough) you can see the rest at https://writequit.org/org/es/stress-run.html

I have been spending time thinking about this new direction (setting a reasonable target_response_rate) and I like it. I think it's something we should discuss; what are people's thoughts on the approaches I have taken?

@s1monw
Copy link
Contributor

s1monw commented May 2, 2017

@dakrone thanks for running these tests, I'm happy we are trying things out 👍

Looking at your results my first question would be what is a good default target_response_rate and can we come up with automatically adjusting it? To be clear, I am not even sure we have to. It's basically a SLA set by the user which is certainly something a lot of users have a good idea of. The biggest issue is the defaults since without a good default we might not be able to enable it by default? I am not saying we have to solve it in this PR but it's something to thing about.

@nik9000
Copy link
Member

nik9000 commented May 2, 2017

For more information as well as the raw data (this explanation is already long enough) you can see the rest at https://writequit.org/org/es/stress-run.html

Cool!

@ppf2
Copy link
Member

ppf2 commented May 2, 2017

Good stuff! It’s promising that you found an alt-alt implementation that is showing improvements 🙂 I think the target response rate is a reasonable configuration for those who really want to use it, and since this is intended to address a specific problem for the high volume search use cases only, we can make it an opt-in thing if we can’t find a good default. What would happen if someone sets an overly aggressive target response rate (much lower than their actual avg query processing time). I am assuming it will just cause a lot more search rejections than the default queue size? We may also want to rename the target_response_rate to something else (so we don't have admins set it to a number thinking that it will give them the response rate they are looking for :P).

@dakrone
Copy link
Member Author

dakrone commented May 2, 2017

Response to @s1monw:

what is a good default target_response_rate and can we come up with
automatically adjusting it?

That's a good question, I set it to 5 seconds in this case mostly because I knew
how expensive this query was going to be. I think if we were to automatically
adjust this, it would have to be at a different level, since at a per-node
level, we don't know if a given response rate is considered "good" independent
of the others nodes' response rates. In other words, if a query takes 7 seconds,
we don't know if that's a "good" or a "bad" response time for that query without
looking at how long it takes other nodes to execute it.

To be clear, I am not even sure we have to. It's basically a SLA set by the
user which is certainly something a lot of users have a good idea of.

I agree with this, it's something the user will always have more intimate
knowledge of than we will.

The biggest issue is the defaults since without a good default we might not be
able to enable it by default?

Yep, the default is the hardest, however, we've already made a decision like
this by bounding the search threadpool queue already (by default, 1000 in
master), so I think we could get away with setting a reasonably large default
response rate, perhaps something like "10s" or even "5s", since I think it's
reasonable to assume users want responses to come back faster than this in
almost all cases (there are some cases where users are okay with queries taking
1 minute+, but I think these are the exception rather than the rule).


Response to @ppf2:

I think the target response rate is a reasonable configuration for those who
really want to use it, and since this is intended to address a specific
problem for the high volume search use cases only, we can make it an opt-in
thing if we can’t find a good default.

I agree, I do think this can really help people that do have this situation,
and I think setting a reasonable default (like "10s" or "5s" as I mentioned
above) would be a good place to start. We do this already with our queue_size,
so this would be just setting a default under a different moniker than
queue_size.

What would happen if someone sets an overly aggressive target response rate
(much lower than their actual avg query processing time). I am assuming it
will just cause a lot more search rejections than the default queue size?

I ran some numbers and made a graph!

For simplicity, let's assume queries arrive at a rate of 1 per second, and take
5 seconds to execute, the target_response_rate is set to 2s on our node. We
assume 1 thread to handle queries. We go with a small frame window (5) so that
every 5 executed tasks we calculate the optimal queue size. We also start with a queue
size of 25 and adjust by 15 every time it's adjusted (to illustrate what happens
the best).

overloaded-by-tasks

At task # 26 (where the first measurement/adjustment happens), 26 seconds have
elapsed, 5 total tasks have executed, and the queue is at 20. ES would then
calculate the optimal queue size, L = λW with λ as being able to handle
0.2q/s and with the target response rate of 2 seconds as W, this gives us a
queue size of 0.2 * 2 = 0.4, or essentially 0, this is our node currently
can't keep up with the rate.

With our auto queuing adjustment and no min or max, the queue would trend
towards 0 and eventually only a single request at a time would be handled.

@s1monw
Copy link
Contributor

s1monw commented May 3, 2017

Yep, the default is the hardest, however, we've already made a decision like
this by bounding the search threadpool queue already (by default, 1000 in
master), so I think we could get away with setting a reasonably large default
response rate, perhaps something like "10s" or even "5s", since I think it's
reasonable to assume users want responses to come back faster than this in
almost all cases (there are some cases where users are okay with queries taking
1 minute+, but I think these are the exception rather than the rule).

really? 5 seconds, I was rather looking into 300ms or so as a default. I mean that is already highish?

@dakrone
Copy link
Member Author

dakrone commented May 10, 2017

retest this please

@dakrone dakrone force-pushed the auto-queue-sizing branch 2 times, most recently from 33021ca to 8cbd20f Compare May 12, 2017 20:10
@dakrone
Copy link
Member Author

dakrone commented May 12, 2017

@s1monw as we discussed, I have updated this branch to have the third behavior as described in the benchmarking above. I've also changed it to use the same value (1000) for min and max of the queue_size, that way this has no user-facing changes as-is. Sorry for the rebasing, there were a lot of unrelated failures due to master and x-pack breakage, so I needed to rebase it on a working copy of master.

It should be ready for another review!

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. I think this is a good first iteration and we should get this in so we can gain some real world experience. We can improve as we do and we might end up with auto-adjusting the target response rate in the future.

}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return a simple EsThreadPoolExecutor if min and max queue size is the same here? I think that would be safer from a bwc perspective?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a BWC perspective it certainly would, though then a user couldn't turn on debug logging to see what kind of calculations we make about the optimal queue size.

I'll make the change though, since it's the most safe thing to do.

}
} catch (ArithmeticException e) {
// There was an integer overflow, so just log about it, rather than adjust the queue size
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we detect this rather than catching this exception? I'd feel better about it if we could

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make sure I understand what you're asking - we switched to using Math.toIntExact, so are you saying we should manually check if the long value is > Integer.MAX_VALUE rather than catching the exception?

* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
*/
class TimedRunnable implements Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

@@ -126,7 +128,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.BULK, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so who we use the new FIXED_AUTO_QUEUE_SIZE but if somebody had some custom settings on their search threadpool will they still work? I mean are the settings for FIXED a true subset of the one in FIXED_AUTO_QUEUE_SIZE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean are the settings for FIXED a true subset of the one in FIXED_AUTO_QUEUE_SIZE?

They are a true subset, the new type makes only additive setting changes

This PR adds a new thread pool type: `fixed_auto_queue_size`. This thread pool
behaves like a regular `fixed` threadpool, except that every
`auto_queue_frame_size` operations (default: 10,000) in the thread pool,
[Little's Law](https://en.wikipedia.org/wiki/Little's_law) is calculated and
used to adjust the pool's `queue_size` either up or down by 50. A minimum and
maximum is taken into account also. When the min and max are the same value, a
regular fixed executor is used instead.

The `SEARCH` threadpool is changed to use this new type of thread pool. However,
the min and max are both set to 1000, meaning auto adjustment is opt-in rather
than opt-out.

Resolves elastic#3890
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>feature :Search/Search Search-related issues that do not fall into other categories v6.0.0-alpha2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants