Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for IllegalStateException on Thread Rejection with Response Caching #120

Merged
merged 1 commit into from
Feb 28, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 215 additions & 22 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,17 @@ public R execute() {
return getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited");
}

if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
try {
try {
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
return executeWithSemaphore();
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}

} catch (Exception e) {
Expand Down Expand Up @@ -504,12 +504,13 @@ private R executeWithSemaphore() {
* We don't throw an exception but just flip to synchronous execution so code doesn't need to change in order to switch a command from running on a separate thread to the calling thread.
*
* @return {@code Future<R>} Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Future.get()} in {@link ExecutionException#getCause()} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited or thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Future.get()} in {@link ExecutionException#getCause()} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited or thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Future.get()} in {@link ExecutionException#getCause()} if invalid arguments or state were used representing a user failure, not a system failure
*/
Expand Down Expand Up @@ -739,8 +740,8 @@ public R call() throws Exception {
}
}

// start execution
future.start();
// start execution and throw an exception if rejection occurs
future.start(true);

return future;
}
Expand Down Expand Up @@ -1336,6 +1337,7 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final AtomicBoolean actualFutureExecuted = new AtomicBoolean(false);
private volatile R result; // the result of the get()
private volatile ExecutionException executionException; // in case an exception is thrown
private volatile HystrixRuntimeException rejectedException;
private volatile Future<R> actualFuture = null;
private volatile boolean isInterrupted = false;
private final CountDownLatch futureStarted = new CountDownLatch(1);
Expand All @@ -1346,10 +1348,17 @@ public QueuedExecutionFuture(HystrixCommand<R> command, ThreadPoolExecutor execu
this.callable = callable;
}

private void start() {
start(false);
}

/**
* Start execution of Callable<K> on ThreadPoolExecutor
*
* @param throwIfRejected
* since we want an exception thrown in the main queue() path but not via cached responses
*/
private void start() {
private void start(boolean throwIfRejected) {
// make sure we only start once
if (started.compareAndSet(false, true)) {
try {
Expand All @@ -1363,11 +1372,22 @@ private void start() {
// mark on counter
metrics.markThreadPoolRejection();
// use a fallback instead (or throw exception if not implemented)
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e));
try {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e));
} catch (HystrixRuntimeException hre) {
actualFuture = asFuture(hre);
// store this so it can be thrown to queue()
rejectedException = hre;
}
} catch (Exception e) {
// unknown exception
logger.error(getLogMessagePrefix() + ": Unexpected exception while submitting to queue.", e);
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "had unexpected exception while attempting to queue for execution.", e));
try {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "had unexpected exception while attempting to queue for execution.", e));
} catch (HystrixRuntimeException hre) {
actualFuture = asFuture(hre);
throw hre;
}
} finally {
futureStarted.countDown();
}
Expand All @@ -1390,6 +1410,10 @@ private void start() {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "Unexpected interruption while waiting on other thread submitting to queue.", e));
}
}

if (throwIfRejected && rejectedException != null) {
throw rejectedException;
}
}

/**
Expand Down Expand Up @@ -1420,6 +1444,7 @@ public R get(long timeout, TimeUnit unit) throws CancellationException, Interrup
* execute even though the application as a whole continues.
*/
}

if (executionException != null) {
throw executionException;
} else {
Expand Down Expand Up @@ -1456,12 +1481,12 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// we will countDown the latch and release threads
if (!started.get() || actualFuture == null) {
/**
* https://github.com/Netflix/Hystrix/issues/80
* https://github.com/Netflix/Hystrix/issues/113
*
* Output any extra information that can help tracking down how this failed
* as it most likely means there's a concurrency bug.
*/
throw new IllegalStateException("Future was not started. Key: "
throw new IllegalStateException("Response Not Available. Key: "
+ getCommandKey().name() + " ActualFuture: " + actualFuture
+ " Started: " + started.get() + " actualFutureExecuted: " + actualFutureExecuted.get()
+ " futureStarted: " + futureStarted.getCount()
Expand Down Expand Up @@ -1489,6 +1514,9 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// we can't capture this in execute/queue so we do it here
metrics.markExceptionThrown();
}
} catch (ExecutionException e) {
// if the actualFuture itself throws an ExcecutionException we want to capture it
executionException = e;
} finally {
// mark that we are done and other threads can proceed
actualResponseReceived.countDown();
Expand All @@ -1515,11 +1543,17 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Override
public boolean isCancelled() {
/* in case another thread got to this (via cache) before the constructing thread started it, we'll optimistically try to start it and start() will ensure only one time wins */
start();
/* now 'actualFuture' will be set to something */
return actualFuture.isCancelled();
}

@Override
public boolean isDone() {
/* in case another thread got to this (via cache) before the constructing thread started it, we'll optimistically try to start it and start() will ensure only one time wins */
start();
/* now 'actualFuture' will be set to something */
return actualFuture.isDone();
}

Expand Down Expand Up @@ -1571,6 +1605,42 @@ public ExecutionResult getExecutionResult() {
};
}

private Future<R> asFuture(final HystrixRuntimeException e) {
return new CommandFuture<R>() {

@Override
public boolean cancel(boolean arg0) {
return false;
}

@Override
public R get() throws InterruptedException, ExecutionException {
throw new ExecutionException(e);
}

@Override
public R get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public ExecutionResult getExecutionResult() {
return executionResult;
}

};
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* TryableSemaphore */
Expand Down Expand Up @@ -4174,6 +4244,76 @@ public void testRequestCacheOnTimeoutThrowsException() throws Exception {
assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

@Test
public void testRequestCacheOnThreadRejectionThrowsException() throws Exception {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
CountDownLatch completionLatch = new CountDownLatch(1);
RequestCacheThreadRejectionWithoutFallback r1 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r1: " + r1.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
assertTrue(r1.isResponseRejected());
// what we want
}

RequestCacheThreadRejectionWithoutFallback r2 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r2: " + r2.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
// e.printStackTrace();
assertTrue(r2.isResponseRejected());
// what we want
}

RequestCacheThreadRejectionWithoutFallback r3 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
Future<Boolean> f3 = r3.queue();
try {
System.out.println("f3: " + f3.get());
// we should have thrown an exception
fail("expected a rejection");
} catch (ExecutionException e) {
// e.printStackTrace();
assertTrue(r3.isResponseRejected());
// what we want
}

// let the command finish (only 1 should actually be blocked on this do to the response cache)
completionLatch.countDown();

// then another after the command has completed
RequestCacheThreadRejectionWithoutFallback r4 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r4: " + r4.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
// e.printStackTrace();
assertTrue(r4.isResponseRejected());
assertFalse(r4.isResponseFromFallback());
// what we want
}

assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(3, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));

assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage());

assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/**
* Test that we can do basic execution without a RequestVariable being initialized.
*/
Expand Down Expand Up @@ -5778,6 +5918,59 @@ public String getCacheKey() {
}
}

private static class RequestCacheThreadRejectionWithoutFallback extends TestHystrixCommand<Boolean> {

final CountDownLatch completionLatch;

public RequestCacheThreadRejectionWithoutFallback(TestCircuitBreaker circuitBreaker, CountDownLatch completionLatch) {
super(testPropsBuilder()
.setCircuitBreaker(circuitBreaker)
.setMetrics(circuitBreaker.metrics)
.setThreadPool(new HystrixThreadPool() {

@Override
public ThreadPoolExecutor getExecutor() {
return null;
}

@Override
public void markThreadExecution() {

}

@Override
public void markThreadCompletion() {

}

@Override
public boolean isQueueSpaceAvailable() {
// always return false so we reject everything
return false;
}

}));
this.completionLatch = completionLatch;
}

@Override
protected Boolean run() {
try {
if (completionLatch.await(1000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("timed out waiting on completionLatch");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}

@Override
public String getCacheKey() {
return "A";
}
}

private static class BadRequestCommand extends TestHystrixCommand<Boolean> {

public BadRequestCommand(TestCircuitBreaker circuitBreaker, ExecutionIsolationStrategy isolationType) {
Expand Down