Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread interrupt only on timeout #672

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,56 +101,56 @@ public void testGetUserSyncWithFallback() {
*/


// @Test
// public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException {
// HystrixRequestContext context = HystrixRequestContext.initializeContext();
// try {
// Future<User> f1 = userService.getUserAsyncFallbackCommand(" ", "name: ");
//
// assertEquals("def", f1.get().getName());
//
// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
// HystrixInvokableInfo<?> getUserAsyncFallbackCommand = getHystrixCommandByKey(
// "getUserAsyncFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");
//
// assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name());
// // confirm that command has failed
// assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // confirm that first fallback has failed
// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // and that second fallback was successful
// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
// } finally {
// context.shutdown();
// }
// }
//
// @Test
// public void testGetUserSyncWithFallbackCommand() {
// HystrixRequestContext context = HystrixRequestContext.initializeContext();
// try {
// User u1 = userService.getUserSyncFallbackCommand(" ", "name: ");
//
// assertEquals("def", u1.getName());
// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
// HystrixInvokableInfo<?> getUserSyncFallbackCommand = getHystrixCommandByKey(
// "getUserSyncFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");
//
// assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name());
// // confirm that command has failed
// assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // confirm that first fallback has failed
// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // and that second fallback was successful
// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
// } finally {
// context.shutdown();
// }
// }
@Test
public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<User> f1 = userService.getUserAsyncFallbackCommand(" ", "name: ");

assertEquals("def", f1.get().getName());

assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> getUserAsyncFallbackCommand = getHystrixCommandByKey(
"getUserAsyncFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");

assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name());
// confirm that command has failed
assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// confirm that first fallback has failed
assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// and that second fallback was successful
assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
} finally {
context.shutdown();
}
}

@Test
public void testGetUserSyncWithFallbackCommand() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
User u1 = userService.getUserSyncFallbackCommand(" ", "name: ");

assertEquals("def", u1.getName());
assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> getUserSyncFallbackCommand = getHystrixCommandByKey(
"getUserSyncFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");

assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name());
// confirm that command has failed
assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// confirm that first fallback has failed
assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// and that second fallback was successful
assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
} finally {
context.shutdown();
}
}


public static class UserService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.ReplaySubject;
import rx.subscriptions.CompositeSubscription;
Expand Down Expand Up @@ -524,7 +525,13 @@ public void call(Subscriber<? super R> s) {
getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}
}
}).subscribeOn(threadPool.getScheduler(properties.executionIsolationThreadInterruptOnTimeout().get()));
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _self.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT);
}
}));
} else {
// semaphore isolated
executionHook.onRunStart(_self);
Expand Down Expand Up @@ -814,7 +821,7 @@ public Observable<R> call(Throwable t) {

return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));
} else {
logger.debug("HystrixCommand execution " + failureType.name() + " and fallback retrieval failed.", fe);
logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
metrics.markFallbackFailure();
// record the executionResult
executionResult = executionResult.addEvents(HystrixEventType.FALLBACK_FAILURE);
Expand All @@ -826,7 +833,7 @@ public Observable<R> call(Throwable t) {
logger.warn("Error calling ExecutionHook.onError", hookException);
}

return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and failed retrieving fallback.", e, fe));
return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));
}
}

Expand Down Expand Up @@ -938,7 +945,6 @@ public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// do fallback logic
// report timeout failure
originalCommand.metrics.markTimeout(System.currentTimeMillis() - originalCommand.invocationStartTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import rx.functions.Func0;

/**
* ThreadPool used to executed {@link HystrixCommand#run()} on separate threads when configured to do so with {@link HystrixCommandProperties#executionIsolationStrategy()}.
Expand All @@ -52,7 +53,7 @@ public interface HystrixThreadPool {

public Scheduler getScheduler();

public Scheduler getScheduler(boolean shouldInterruptThread);
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);

/**
* Mark when a thread begins executing a command.
Expand Down Expand Up @@ -155,8 +156,6 @@ public interface HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final Scheduler nonInterruptingScheduler;
private final Scheduler interruptingScheduler;

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
Expand All @@ -167,8 +166,6 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();
this.nonInterruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, false);
this.interruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, true);

/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
Expand All @@ -183,17 +180,18 @@ public ThreadPoolExecutor getExecutor() {
@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(true);
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
if (shouldInterruptThread) {
return interruptingScheduler;
} else {
return nonInterruptingScheduler;
}
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

// allow us to change things via fast-properties by setting it each time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import rx.*;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.*;

Expand Down Expand Up @@ -48,11 +49,15 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S
}

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
this(concurrencyStrategy, threadPool, true);
this(concurrencyStrategy, threadPool, new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}


public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, boolean shouldInterruptThread) {
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
Expand Down Expand Up @@ -106,9 +111,9 @@ public Subscription schedule(Action0 action) {
private static class ThreadPoolScheduler extends Scheduler {

private final HystrixThreadPool threadPool;
private final boolean shouldInterruptThread;
private final Func0<Boolean> shouldInterruptThread;

public ThreadPoolScheduler(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
Expand All @@ -133,9 +138,9 @@ private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final boolean shouldInterruptThread;
private final Func0<Boolean> shouldInterruptThread;

public ThreadPoolWorker(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
Expand Down Expand Up @@ -167,7 +172,6 @@ public Subscription schedule(final Action0 action) {
sa.addParent(subscription);

Future<?> f = threadPool.getExecutor().submit(sa);

sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread));

return sa;
Expand All @@ -185,17 +189,22 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
*/
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final Future<?> f;
private final boolean shouldInterruptThread;
private final Func0<Boolean> shouldInterruptThread;

private FutureCompleterWithConfigurableInterrupt(Future<?> f, boolean shouldInterruptThread) {
private FutureCompleterWithConfigurableInterrupt(Future<?> f, Func0<Boolean> shouldInterruptThread) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
public void unsubscribe() {
f.cancel(shouldInterruptThread);
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}

@Override
public boolean isUnsubscribed() {
return f.isCancelled();
Expand Down
Loading