Skip to content

Commit

Permalink
Add ProgressCheck callbacks to end-to-end acknowledgements
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 31, 2023
1 parent 8b674cd commit c486f01
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface AcknowledgementSetManager {
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

AcknowledgementSet create(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval, final Consumer<Boolean> callback, final Duration timeout);
/**
* Releases an event's reference
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ public void acquire(final EventHandle eventHandle) {
}

public void release(final EventHandle eventHandle, final boolean success) {
System.out.println("....release..1");
if (eventHandle == null) {
numNullHandles.incrementAndGet();
return;
}
System.out.println("....release..2");
DefaultAcknowledgementSet acknowledgementSet = getAcknowledgementSet(eventHandle);
lock.lock();
boolean exists = false;
Expand All @@ -99,15 +101,18 @@ public void release(final EventHandle eventHandle, final boolean success) {
} finally {
lock.unlock();
}
System.out.println("....release..3");
// if acknowledgementSet doesn't exist then it means some late
// arrival of event handle release after the acknowledgement set
// is cleaned up.
if (exists) {
System.out.println("....release..4");
boolean b = acknowledgementSet.release(eventHandle, success);
} else {
LOG.warn("Trying to release from an AcknowledgementSet that does not exist");
numInvalidReleases.incrementAndGet();
}
System.out.println("....release..5");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer<Boolean> callback;
private final Consumer<Double> progressCheckCallback;
private final Instant expiryTime;
private final ExecutorService executor;
// This lock protects all the non-final members
Expand All @@ -34,12 +38,25 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
private final Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private Future<?> callbackFuture;
private final DefaultAcknowledgementSetMetrics metrics;
private ScheduledFuture<?> progressCheckFuture;
private boolean completed;
private AtomicInteger totalEventsAdded;

public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) {
public DefaultAcknowledgementSet(final ExecutorService executor,
final ScheduledExecutorService scheduledExecutor,
final Consumer<Boolean> callback,
final Duration expiryTime,
final Consumer<Double> progressCheckCallback,
final Duration progressCheckInterval,
final DefaultAcknowledgementSetMetrics metrics) {
this.callback = callback;
this.progressCheckCallback = progressCheckCallback;
this.result = true;
this.totalEventsAdded = new AtomicInteger(0);
this.executor = executor;
if (scheduledExecutor != null && progressCheckCallback != null && progressCheckInterval != null) {
this.progressCheckFuture = scheduledExecutor.scheduleAtFixedRate(this::checkProgress, 0L, progressCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
}
this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis());
this.callbackFuture = null;
this.metrics = metrics;
Expand All @@ -48,6 +65,23 @@ public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<
lock = new ReentrantLock(true);
}

public DefaultAcknowledgementSet(final ExecutorService executor,
final Consumer<Boolean> callback,
final Duration expiryTime,
final DefaultAcknowledgementSetMetrics metrics) {
this(executor, null, callback, expiryTime, null, null, metrics);
}

public void checkProgress() {
lock.lock();
int numberOfEventsPending = pendingAcknowledgments.size();
lock.unlock();
if (progressCheckCallback != null) {
System.out.println("===progressCheck===="+numberOfEventsPending +"---"+totalEventsAdded.get());
progressCheckCallback.accept((double)numberOfEventsPending/totalEventsAdded.get());
}
}

@Override
public void add(Event event) {
lock.lock();
Expand All @@ -56,6 +90,7 @@ public void add(Event event) {
EventHandle eventHandle = new DefaultEventHandle(this);
((JacksonEvent) event).setEventHandle(eventHandle);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -84,6 +119,9 @@ public boolean isDone() {
return true;
}
if (Instant.now().isAfter(expiryTime)) {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
if (callbackFuture != null) {
callbackFuture.cancel(true);
callbackFuture = null;
Expand All @@ -108,6 +146,10 @@ public void complete() {
try {
completed = true;
if (pendingAcknowledgments.size() == 0) {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
System.out.println("2=====Calling ack callback====");
callbackFuture = executor.submit(() -> callback.accept(this.result));
}
} finally {
Expand All @@ -117,6 +159,7 @@ public void complete() {

@Override
public boolean release(final EventHandle eventHandle, final boolean result) {
System.out.println("===release===="+eventHandle);
lock.lock();
// Result indicates negative or positive acknowledgement. Even if one of the
// events in the set report negative acknowledgement, then the end result
Expand All @@ -132,6 +175,10 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (completed && pendingAcknowledgments.size() == 0) {
if (progressCheckFuture != null) {
progressCheckFuture.cancel(false);
}
System.out.println("1=====Calling ack callback====");
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

@Named
public class DefaultAcknowledgementSetManager implements AcknowledgementSetManager {
private static final int DEFAULT_WAIT_TIME_MS = 15 * 1000;
private final AcknowledgementSetMonitor acknowledgementSetMonitor;
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread;
private PluginMetrics pluginMetrics;
private DefaultAcknowledgementSetMetrics metrics;
Expand All @@ -36,12 +39,23 @@ public DefaultAcknowledgementSetManager(
public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, final Duration waitTime) {
this.acknowledgementSetMonitor = new AcknowledgementSetMonitor();
this.executor = Objects.requireNonNull(callbackExecutor);
// Single thread executor should be sufficient in most cases (esp if the underlying host has 1 or 2 cores)
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
acknowledgementSetMonitorThread = new AcknowledgementSetMonitorThread(acknowledgementSetMonitor, waitTime);
acknowledgementSetMonitorThread.start();
pluginMetrics = PluginMetrics.fromNames("acknowledgementSetManager", "acknowledgements");
metrics = new DefaultAcknowledgementSetMetrics(pluginMetrics);
}

public AcknowledgementSet create(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval, final Consumer<Boolean> callback, final Duration timeout) {
AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(executor, scheduledExecutor,
callback, timeout,
progressCheckCallback, progressCheckInterval, metrics);
acknowledgementSetMonitor.add(acknowledgementSet);
metrics.increment(DefaultAcknowledgementSetMetrics.CREATED_METRIC_NAME);
return acknowledgementSet;
}

public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout) {
AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(executor, callback, timeout, metrics);
acknowledgementSetMonitor.add(acknowledgementSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
throw new UnsupportedOperationException("create operation not supported");
}

public AcknowledgementSet create(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval, final Consumer<Boolean> callback, final Duration timeout) {
throw new UnsupportedOperationException("create operation not supported");
}

public void acquireEventReference(final Event event) {
throw new UnsupportedOperationException("acquire operation not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ class DefaultAcknowledgementSetManagerTests {
EventHandle eventHandle1;
EventHandle eventHandle2;
EventHandle eventHandle3;
EventHandle eventHandle4;
EventHandle eventHandle5;
EventHandle eventHandle6;
Boolean result;
double currentRatio;

@BeforeEach
void setup() {
currentRatio = 0;
callbackExecutor = Executors.newFixedThreadPool(2);
event1 = mock(JacksonEvent.class);
doAnswer((i) -> {
Expand All @@ -73,6 +78,62 @@ DefaultAcknowledgementSetManager createObjectUnderTest() {
return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT.toMillis() * 2));
}

@Test
void testWithProgressCheckCallbacks() {
event3 = mock(JacksonEvent.class);
doAnswer((i) -> {
eventHandle3 = i.getArgument(0);
return null;
}).when(event3).setEventHandle(any());
lenient().when(event3.getEventHandle()).thenReturn(eventHandle3);

JacksonEvent event4 = mock(JacksonEvent.class);
doAnswer((i) -> {
eventHandle4 = i.getArgument(0);
return null;
}).when(event4).setEventHandle(any());
lenient().when(event4.getEventHandle()).thenReturn(eventHandle4);

JacksonEvent event5 = mock(JacksonEvent.class);
doAnswer((i) -> {
eventHandle5 = i.getArgument(0);
return null;
}).when(event5).setEventHandle(any());
lenient().when(event5.getEventHandle()).thenReturn(eventHandle5);

JacksonEvent event6 = mock(JacksonEvent.class);
doAnswer((i) -> {
eventHandle6 = i.getArgument(0);
return null;
}).when(event6).setEventHandle(any());
lenient().when(event6.getEventHandle()).thenReturn(eventHandle6);

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1), (flag) -> { result = flag; }, Duration.ofMillis(10000));
acknowledgementSet2.add(event3);
acknowledgementSet2.add(event4);
acknowledgementSet2.add(event5);
acknowledgementSet2.add(event6);
acknowledgementSet2.complete();
acknowledgementSetManager.releaseEventReference(eventHandle3, true);
await().atMost(TEST_TIMEOUT.multipliedBy(5))
.untilAsserted(() -> {
assertThat(currentRatio, equalTo(0.75));
});
acknowledgementSetManager.releaseEventReference(eventHandle4, true);
await().atMost(TEST_TIMEOUT.multipliedBy(5))
.untilAsserted(() -> {
assertThat(currentRatio, equalTo(0.5));
});
acknowledgementSetManager.releaseEventReference(eventHandle5, true);
await().atMost(TEST_TIMEOUT.multipliedBy(5))
.untilAsserted(() -> {
assertThat(currentRatio, equalTo(0.25));
});
acknowledgementSetManager.releaseEventReference(eventHandle6, true);
assertThat(result, equalTo(true));

}

@Test
void testBasic() {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
Expand Down
Loading

0 comments on commit c486f01

Please sign in to comment.