Skip to content

Commit

Permalink
Merge pull request #61 from benjchristensen/request-log-with-timeouts
Browse files Browse the repository at this point in the history
Request log with timeouts
  • Loading branch information
benjchristensen committed Dec 21, 2012
2 parents 40bb191 + 39dbc91 commit c14b3a0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 66 deletions.
144 changes: 80 additions & 64 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,27 +416,20 @@ public final R execute() {
return getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited");
}

try {

if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
R r = queueInThread().get();
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
try {
R r = executeWithSemaphore();
return r;
} else {
try {
R r = executeWithSemaphore();
return r;
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}

} finally {
// the total execution time for the user thread including queuing, thread scheduling, run() execution
metrics.addUserThreadExecutionTime(System.currentTimeMillis() - invocationStartTime.get());
}

} catch (Throwable e) {
if (e instanceof HystrixBadRequestException) {
throw (HystrixBadRequestException) e;
Expand All @@ -457,20 +450,7 @@ public final R execute() {
throw new HystrixRuntimeException(FailureType.COMMAND_EXCEPTION, this.getClass(), message, e, null);
}
} finally {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
recordExecutedCommand();
}
}

Expand All @@ -495,6 +475,9 @@ private R executeWithSemaphore() {
return response;
} finally {
executionSemaphore.release();

/* execution time on execution via semaphore */
recordTotalExecutionTime(invocationStartTime.get());
}
} else {
// mark on counter
Expand Down Expand Up @@ -557,20 +540,7 @@ public final Future<R> queue() {
throw e;
}
} finally {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
recordExecutedCommand();
}
}

Expand Down Expand Up @@ -655,6 +625,9 @@ public ExecutionResult getExecutionResult() {
executionCompleted.countDown();
// release the semaphore
executionSemaphore.release();

/* execution time on queue via semaphore */
recordTotalExecutionTime(invocationStartTime.get());
}
} else {
metrics.markSemaphoreRejection();
Expand All @@ -665,9 +638,6 @@ public ExecutionResult getExecutionResult() {
}

private Future<R> queueInThread() {
// we want to run as a thread
long startTime = System.currentTimeMillis();

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
isExecutedInThread.set(true);

Expand All @@ -677,7 +647,7 @@ private Future<R> queueInThread() {
// a snapshot of time so the thread can measure how long it waited before executing
final long timeBeforeExecution = System.currentTimeMillis();
// wrap the synchronous execute() method in a Callable and execute in the threadpool
QueuedExecutionFuture future = new QueuedExecutionFuture(this, startTime, threadPool.getExecutor(), new HystrixContextCallable<R>(new Callable<R>() {
QueuedExecutionFuture future = new QueuedExecutionFuture(this, threadPool.getExecutor(), new HystrixContextCallable<R>(new Callable<R>() {

@Override
public R call() throws Exception {
Expand Down Expand Up @@ -806,14 +776,6 @@ private R executeCommand() {
executionResult = executionResult.setException(e);
return getFallbackOrThrowException(HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", e);
} finally {
/*
* we record the executionTime for command execution
* if the command is never executed (rejected, short-circuited, etc) then it will be left unset
* for this metric we include failures and successes as we use it for per-request profiling and debugging
* whereas 'metrics.addCommandExecutionTime(duration)' is used by stats across many requests
*/
executionResult = executionResult.setExecutionTime((int) (System.currentTimeMillis() - startTime));

// record that we're completed
isExecutionComplete.set(true);
}
Expand Down Expand Up @@ -850,6 +812,47 @@ private R getFallbackWithProtection() {
}
}

/**
* Record the duration of execution as response or exception is being returned to the caller.
*/
private void recordTotalExecutionTime(long startTime) {
long duration = System.currentTimeMillis() - startTime;
// the total execution time for the user thread including queuing, thread scheduling, run() execution
metrics.addUserThreadExecutionTime(duration);

/*
* We record the executionTime for command execution.
*
* If the command is never executed (rejected, short-circuited, etc) then it will be left unset.
*
* For this metric we include failures and successes as we use it for per-request profiling and debugging
* whereas 'metrics.addCommandExecutionTime(duration)' is used by stats across many requests.
*/
executionResult = executionResult.setExecutionTime((int) duration);
}

/**
* Record that this command was executed in the HystrixRequestLog.
* <p>
* This can be treated as an async operation as it just adds a references to "this" in the log even if the current command is still executing.
*/
private void recordExecutedCommand() {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
}

/**
* Whether the 'circuit-breaker' is open meaning that <code>execute()</code> will immediately return
* the <code>getFallback()</code> response and not attempt a HystrixCommand execution.
Expand Down Expand Up @@ -1240,7 +1243,6 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final ThreadPoolExecutor executor;
private final Callable<R> callable;
private final HystrixCommand<R> command;
private final long startTime;
private final CountDownLatch actualResponseReceived = new CountDownLatch(1);
private final AtomicBoolean actualFutureExecuted = new AtomicBoolean(false);
private volatile R result; // the result of the get()
Expand All @@ -1249,9 +1251,8 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final CountDownLatch futureStarted = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);

public QueuedExecutionFuture(HystrixCommand<R> command, long startTime, ThreadPoolExecutor executor, Callable<R> callable) {
public QueuedExecutionFuture(HystrixCommand<R> command, ThreadPoolExecutor executor, Callable<R> callable) {
this.command = command;
this.startTime = startTime;
this.executor = executor;
this.callable = callable;
}
Expand Down Expand Up @@ -1374,7 +1375,7 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// mark this command as timed-out so the run() when it completes can ignore it
if (isCommandTimedOut.compareAndSet(false, true)) {
// report timeout failure (or skip this if the compareAndSet failed as that means a thread-race occurred with the execution as the object lived in the queue too long)
metrics.markTimeout(System.currentTimeMillis() - startTime);
metrics.markTimeout(System.currentTimeMillis() - invocationStartTime.get());
}

try {
Expand All @@ -1388,6 +1389,9 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
} finally {
// mark that we are done and other threads can proceed
actualResponseReceived.countDown();

/* execution time on threaded execution */
recordTotalExecutionTime(invocationStartTime.get());
}
}

Expand Down Expand Up @@ -2356,7 +2360,7 @@ public void testExecutionTimeoutWithNoFallback() {
command.execute();
fail("we shouldn't get here");
} catch (Exception e) {
e.printStackTrace();
// e.printStackTrace();
if (e instanceof HystrixRuntimeException) {
HystrixRuntimeException de = (HystrixRuntimeException) e;
assertNotNull(de.getFallbackException());
Expand All @@ -2368,6 +2372,9 @@ public void testExecutionTimeoutWithNoFallback() {
fail("the exception should be HystrixRuntimeException");
}
}
// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);

assertTrue(command.isResponseTimedOut());
assertFalse(command.isResponseFromFallback());
assertFalse(command.isResponseRejected());
Expand Down Expand Up @@ -2397,6 +2404,8 @@ public void testExecutionTimeoutWithFallback() {
TestHystrixCommand<Boolean> command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_SUCCESS);
try {
assertEquals(false, command.execute());
// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);
assertTrue(command.isResponseTimedOut());
assertTrue(command.isResponseFromFallback());
} catch (Exception e) {
Expand Down Expand Up @@ -2442,7 +2451,8 @@ public void testExecutionTimeoutFallbackFailure() {
fail("the exception should be HystrixRuntimeException");
}
}

// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
Expand Down Expand Up @@ -4404,6 +4414,12 @@ protected Boolean run() {
Thread.sleep(timeout * 10);
} catch (InterruptedException e) {
e.printStackTrace();
// ignore and sleep some more to simulate a dependency that doesn't obey interrupts
try {
Thread.sleep(timeout * 2);
} catch (Exception e2) {
// ignore
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public static Collection<HystrixCommandMetrics> getInstances() {
this.group = commandGroup;
this.properties = properties;
this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets());
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindow(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindow(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.eventNotifier = eventNotifier;
}

Expand Down

0 comments on commit c14b3a0

Please sign in to comment.