-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented a heap-based circuit breaker #2155
Implemented a heap-based circuit breaker #2155
Conversation
…revent entry buffers from accepting events after the heap usage reaches a specified value. Resolves opensearch-project#2150. Signed-off-by: David Venable <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2155 +/- ##
============================================
+ Coverage 93.99% 94.12% +0.13%
- Complexity 1633 1694 +61
============================================
Files 204 214 +10
Lines 4729 4887 +158
Branches 378 392 +14
============================================
+ Hits 4445 4600 +155
- Misses 194 197 +3
Partials 90 90 ☔ View full report in Codecov by Sentry. |
private final HeapCircuitBreaker heapCircuitBreaker; | ||
|
||
CircuitBreakerService(final CircuitBreakerConfig circuitBreakerConfig) { | ||
if(circuitBreakerConfig != null && circuitBreakerConfig.getHeapConfig() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take it or leave it - we could iterate over a collection here to construct the global circuit breaker. One idea for this would be to maintain a Map<Condition, Function<CircuitBreakerConfig, CircuitBreaker>> to construct a list of CircuitBreakers passed to the GlobalCircuitBreaker constructor
Something like
public class CircuitBreakerService {
private final Map<Function<CircuitBreakerConfig, boolean>, Function<CircuitBreakerConfig, CircuitBreaker>> CIRCUIT_BREAKER_CREATORS = Map.of(
(circuitBreakerConfig) -> Objects.nonNull(circuitBreakerConfig.getHeapConfig()), (circuitBreakerConfig) -> new HeapCircuitBreaker(circuitBreakerConfig.getHeapConfig()),
...);
private final CircuitBreaker globalCircuitBreaker;
CircuitBreakerService(final CircuitBreakerConfig circuitBreakerConfig) {
if (Objects.nonNull(circuitBreakerConfig)) {
final List<CircuitBreaker> circuitBreakers = CIRCUIT_BREAKER_CREATORS.entrySet().stream()
.filter(entry -> entry.getKey().apply(circuitBreakerConfig))
.map(entry -> entry.getValue().apply(circuitBreakerConfig))
.collect(Collectors.toList());
globalCircuitBreaker = new GlobalCircuitBreaker(circuitBreakers);
}
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that as well, but I also thought it wasn't necessary now. We could add this when we add a new circuit breaker.
|
||
@Override | ||
public void write(final T record, final int timeoutInMillis) throws TimeoutException { | ||
checkBreaker(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect any performance impact from checking the breakers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some testing of the call to check memory usage and it was sub-milliseconds.
I did add a locking mechanism since we have multiple threads checking at the same time. That might have an impact.
An alternative approach I considered was to schedule a task (say every 1 second) to check the memory usage and update the breaker value rather than have one thread handle it. Then this checkBreaker
is really just reading a boolean value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a dedicated task checking the memory seems like a good idea. Do you see any downsides to that approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra thread may use some CPU, but that is probably not too much of an issue.
I'll probably add no synchronization in this case since the exact timing isn't too important (some threads may get circuit breaker before others). This should remove any performance impact from synchronization. Let me know if you think otherwise.
|
||
private void checkBreaker() throws TimeoutException { | ||
if(circuitBreaker.isOpen()) | ||
throw new TimeoutException("Circuit breaker is open. Unable to write to buffer."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a custom exception, maybe CircuitBreakingException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the TimeoutException
because that is what Buffer
currently throws for such situations. Callers of Buffer
already have some expectation here on what it throws. Maybe it won't affect existing callers to add a new exception. I could check usages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was that callers may want to distinguish between the circuit breaker being tripped vs other timeout scenarios to decide on what action should be taken. If adding the new exception would require a bunch of refactors to the callers then it's probably not worth it for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other thought on this is that I tend to think the sources (which write to buffers) probably should not be too aware of the different reasons the buffer can fill.
If want some form of global action, we could add that here.
I propose leaving this as it is currently written. In the future if we have a concrete need, we can discuss this idea further and make the change.
|
||
final Buffer buffer; | ||
if(source instanceof PipelineConnector) { | ||
buffer = multiBufferDecorator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the CircuitBreakingBuffer here with a higher usage threshold? Or perhaps use a slightly lower usage threshold than specified for non-PipelineConnector sources. Slightly offsetting these should avoid deadlock while preventing OOM. Though performance would take a hit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be a good follow-on.
Currently, the PipelineConnector
shuts down the whole pipeline if it fails to write to the buffer. So supporting this might be a slightly larger change.
Signed-off-by: David Venable <[email protected]>
|
||
@Override | ||
public boolean isOpen() { | ||
if(lock.tryLock()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to have poor performance. Instead of each thread needing to acquire the lock to check, can it be possible to cache here? or use a write only lock that allows checking without needing to acquire the lock.
Edit: I see that that there is some caching happening here since tryLock won't block. It would still be better to not even need to check a lock to check the circuit breaker if that's possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is non-blocking, but I agree we can drop the synchronization. I pushed a change which runs a scheduled task to check. It will use a volatile field instead of synchronization to help with performance on the calls to isOpen
.
} | ||
|
||
final long usedMemoryBytes = getUsedMemoryBytes(); | ||
if(usedMemoryBytes > usageBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we give a hint to GC to flush like System.gc()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I'll add that as well.
…tBreakerService to CircuitBreakerManager. Added an integration test. Signed-off-by: David Venable <[email protected]>
…rry about synchronizing it. Signed-off-by: David Venable <[email protected]>
…ps. Also, corrected another test to use a sleep. Signed-off-by: David Venable <[email protected]>
…aker tests. Signed-off-by: David Venable <[email protected]>
… out scheduled threads. Signed-off-by: David Venable <[email protected]>
Signed-off-by: David Venable <[email protected]>
docs/configuration.md
Outdated
``` | ||
|
||
* `usage` - float - The absolute value of JVM memory which will trip the circuit breaker. This can be defined with bytes (`b`), kilobytes (`kb`), megabytes (`mb`), or gigabytes (`gb`). | ||
* `reset` - Duration - The time between when the circuit is tripped and the next attempt to validate will occur. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the checkInterval here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks for the reminder!
…to 500ms from 1s. Signed-off-by: David Venable <[email protected]>
@engechas , I added the documentation. I also changed the default |
Implemented a heap-based circuit breaker. This circuit breaker will prevent entry buffers from accepting events after the heap usage reaches a specified value. This checks for heap usage in a background thread and updates the state, which the buffer will then use to determine if the circuit breaker is open or closed. This also signals to the JVM to start a GC when the threshold is reached. Resolves opensearch-project#2150. Signed-off-by: David Venable <[email protected]> Signed-off-by: mahesh724 <[email protected]>
Implemented a heap-based circuit breaker. This circuit breaker will prevent entry buffers from accepting events after the heap usage reaches a specified value. This checks for heap usage in a background thread and updates the state, which the buffer will then use to determine if the circuit breaker is open or closed. This also signals to the JVM to start a GC when the threshold is reached. Resolves opensearch-project#2150. Signed-off-by: David Venable <[email protected]>
Description
This PR adds a heap-based circuit breaker. If the buffer is configured with a value too low and the heap size grows large enough, this will kick in to prevent writing to the buffer.
The circuit breaker is implemented in
HeapCircuitBreaker
. TheCircuitBreakingBuffer
is a decorator aroundBuffer
which will use the globalCircuitBreaker
as provided by theCircuitBreaker
service. Right now, there is only oneCircuitBreaker
implementation - theHeapCircuitBreaker
.In addition, in order to support a byte format similar to the Data Prepper duration, I added a
ByteCount
value class and aByteCountDeserializer
.Issues Resolved
Resolves #2150
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.