Skip to content

Commit

Permalink
Makes it an error to store during assembly of a call
Browse files Browse the repository at this point in the history
Before this, there was some extra code in the throttle package handling
a bug in our in memory storage. This fixes that and removes the extra
code.

See #2502
  • Loading branch information
Adrian Cole committed May 10, 2019
1 parent 7f800e4 commit 8300976
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.storage.InMemoryStorage;

/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
Expand All @@ -41,39 +39,21 @@
*
* @see ThrottledStorageComponent
*/
final class ThrottledCall<V> extends Call<V> {
final class ThrottledCall<V> extends Call.Base<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
final Listener limitListener;
/**
* supplier call needs to be supplied later to avoid having it take action when it is created
* (like {@link InMemoryStorage} and thus avoid being throttled.
*/
final Supplier<? extends Call<V>> supplier;
volatile Call<V> delegate;
volatile boolean canceled;

public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
Supplier<? extends Call<V>> supplier) {
final Call<V> delegate;

ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
this.executor = executor;
this.limiter = limiter;
this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
this.supplier = supplier;
this.delegate = delegate;
}

// TODO: refactor this when in-memory no longer executes storage ops during assembly time
ThrottledCall(ThrottledCall<V> other) {
this(other.executor, other.limiter,
other.delegate == null ? other.supplier : () -> other.delegate.clone());
}
@Override protected V doExecute() throws IOException {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

// TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
// which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
// the need for assembly time throttling (fix to in-memory storage)
@Override public V execute() throws IOException {
try {
delegate = supplier.get();

// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(delegate.toString());
Expand Down Expand Up @@ -115,9 +95,11 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void enqueue(Callback<V> callback) {
@Override protected void doEnqueue(Callback<V> callback) {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

try {
executor.execute(new QueuedCall(callback));
executor.execute(new QueuedCall(callback, limitListener));
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
Expand All @@ -127,21 +109,12 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void cancel() {
canceled = true;
if (delegate != null) delegate.cancel();
}

@Override public boolean isCanceled() {
return canceled || (delegate != null && delegate.isCanceled());
}

@Override public Call<V> clone() {
return new ThrottledCall<>(this);
return new ThrottledCall<>(executor, limiter, delegate.clone());
}

@Override public String toString() {
return "Throttled" + supplier;
return "Throttled" + delegate;
}

static String setCurrentThreadName(String name) {
Expand All @@ -153,17 +126,17 @@ static String setCurrentThreadName(String name) {

final class QueuedCall implements Runnable {
final Callback<V> callback;
final Listener limitListener;

QueuedCall(Callback<V> callback) {
QueuedCall(Callback<V> callback, Listener limitListener) {
this.callback = callback;
this.limitListener = limitListener;
}

@Override public void run() {
try {
if (isCanceled()) return;

delegate = ThrottledCall.this.supplier.get();

String oldName = setCurrentThreadName(delegate.toString());
try {
enqueueAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry regist
return "Throttled" + delegate;
}

final class ThrottledSpanConsumer implements SpanConsumer {
static final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;
Expand All @@ -116,7 +116,7 @@ final class ThrottledSpanConsumer implements SpanConsumer {
}

@Override public Call<Void> accept(List<Span> spans) {
return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
import com.netflix.concurrency.limits.limit.SettableLimit
import com.netflix.concurrency.limits.limiter.SimpleLimiter
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
Expand All @@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.Semaphore
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Supplier

// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
class ThrottledCallTest {
var limit = SettableLimit.startingAt(0)
var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
val limit = SettableLimit.startingAt(0)
val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()

inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
val numThreads = 1
val executor = Executors.newSingleThreadExecutor();
@After fun shutdownExecutor() = executor.shutdown()

@Test fun callCreation_isDeferred() {
val created = booleanArrayOf(false)
inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)

val throttle = createThrottle(Supplier {
created[0] = true
Call.create<Void>(null)
})
@Test fun niceToString() {
val delegate: Call<Void> = mock()
`when`(delegate.toString()).thenReturn("StoreSpansCall{}")

assertThat(created).contains(false)
throttle.execute()
assertThat(created).contains(true)
assertThat(ThrottledCall(executor, limiter, delegate))
.hasToString("ThrottledStoreSpansCall{}")
}

@Test fun execute_isThrottled() {
val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
limit.limit = totalTasks

val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
val failLock = Semaphore(1)
val throttle =
createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
val throttled = throttle(LockedCall(startLock, waitLock))

// Step 1: drain appropriate locks
startLock.drainPermits()
Expand All @@ -83,7 +79,7 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val backgroundPool = Executors.newCachedThreadPool()
for (i in 0 until totalTasks) {
backgroundPool.submit(Callable { throttle.execute() })
backgroundPool.submit(Callable { throttled.clone().execute() })
}

try {
Expand All @@ -93,7 +89,7 @@ class ThrottledCallTest {
// Step 4: submit something beyond our limits
val future = backgroundPool.submit(Callable {
try {
throttle.execute()
throttled.execute()
} catch (e: IOException) {
throw RuntimeException(e)
} finally {
Expand Down Expand Up @@ -125,7 +121,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true

val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
val throttle = ThrottledCall(executor, mockLimiter(listener), call)
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
Expand All @@ -137,8 +133,7 @@ class ThrottledCallTest {
@Test fun execute_ignoresLimit_whenPoolFull() {
val listener: Listener = mock()

val throttle =
ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.execute()
assertThat(true).isFalse() // should raise a RejectedExecutionException
Expand All @@ -148,14 +143,13 @@ class ThrottledCallTest {
}

@Test fun enqueue_isThrottled() {
val numThreads = 1
val queueSize = 1
val totalTasks = numThreads + queueSize
limit.limit = totalTasks

val startLock = Semaphore(numThreads)
val waitLock = Semaphore(totalTasks)
val throttle =
createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
val throttle = throttle(LockedCall(startLock, waitLock))

// Step 1: drain appropriate locks
startLock.drainPermits()
Expand All @@ -164,15 +158,15 @@ class ThrottledCallTest {
// Step 2: saturate threads and fill queue
val callback: Callback<Void> = mock()
for (i in 0 until totalTasks) {
throttle.enqueue(callback)
throttle.clone().enqueue(callback)
}

// Step 3: make sure the threads actually started
startLock.acquire(numThreads)

try {
// Step 4: submit something beyond our limits and make sure it fails
throttle.enqueue(callback)
throttle.clone().enqueue(callback)

assertThat(true).isFalse() // should raise a RejectedExecutionException
} catch (e: RejectedExecutionException) {
Expand All @@ -187,7 +181,7 @@ class ThrottledCallTest {
val call = FakeCall()
call.overCapacity = true

val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
val throttle = ThrottledCall(executor, mockLimiter(listener), call)
val latch = CountDownLatch(1)
throttle.enqueue(object : Callback<Void> {
override fun onSuccess(value: Void) {
Expand All @@ -207,7 +201,7 @@ class ThrottledCallTest {
val listener: Listener = mock()

val throttle =
ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
try {
throttle.enqueue(null)
assertThat(true).isFalse() // should raise a RejectedExecutionException
Expand All @@ -216,18 +210,7 @@ class ThrottledCallTest {
}
}

private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
return createThrottle(1, 1, delegate)
}

private fun createThrottle(
poolSize: Int,
queueSize: Int,
delegate: Supplier<Call<Void>>
): ThrottledCall<Void> {
limit.setLimit(limit.getLimit() + 1)
return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
}
private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)

private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
override fun doExecute(): Void? {
Expand All @@ -252,11 +235,6 @@ class ThrottledCallTest {
override fun clone() = LockedCall(startLock, waitLock);
}

private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
LinkedBlockingQueue(queueSize))
}

private fun mockExhaustedPool(): ExecutorService {
val mock: ExecutorService = mock()
doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
Expand Down
56 changes: 56 additions & 0 deletions zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.internal.Trace;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static zipkin2.TestObjects.BACKEND;
import static zipkin2.TestObjects.CLIENT_SPAN;
import static zipkin2.TestObjects.DAY;
Expand Down Expand Up @@ -106,6 +110,58 @@ protected SpanStore store() {
allShouldWorkWhenEmpty();
}

@Test public void consumer_properlyImplementsCallContract_execute() throws IOException {
Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));

// Ensure the implementation didn't accidentally do I/O at assembly time.
assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
call.execute();

assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
.containsExactly(LOTS_OF_SPANS[0]);

try {
call.execute();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalStateException e) {
}

// no problem to clone a call
call.clone().execute();
}

@Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
// Ensure the implementation didn't accidentally do I/O at assembly time.
assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();

CountDownLatch latch = new CountDownLatch(1);
Callback<Void> callback = new Callback<Void>() {
@Override public void onSuccess(Void value) {
latch.countDown();
}

@Override public void onError(Throwable t) {
latch.countDown();
}
};

call.enqueue(callback);
latch.await();

assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
.containsExactly(LOTS_OF_SPANS[0]);

try {
call.enqueue(callback);
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalStateException e) {
}

// no problem to clone a call
call.clone().execute();
}

/**
* Ideally, storage backends can deduplicate identical documents as this will prevent some
* analysis problems such as double-counting dependency links or other statistics. While this test
Expand Down
Loading

0 comments on commit 8300976

Please sign in to comment.