Skip to content

Commit

Permalink
Close cancelled watchers
Browse files Browse the repository at this point in the history
A cancelled watcher is effectively dead and will not receive any more
events so it's helpful for users if the watcher gets closed
automatically.

Note that we don't want to reschedule the watcher since users may want
to take corrective action first, such as finding the latest revision if
the watcher was cancelled due to CompactedException.
  • Loading branch information
mfleming authored and lburgazzoli committed Jul 16, 2021
1 parent f2c43bb commit 580f269
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void onNext(WatchResponse response) {
error = newEtcdException(ErrorCode.FAILED_PRECONDITION, reason);
}

listener.onError(error);
handleError(toEtcdException(error), false);
} else if (response.getEventsCount() == 0 && option.isProgressNotify()) {

//
Expand Down
45 changes: 37 additions & 8 deletions jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,10 @@ public void testWatchOnDelete(final Client client) throws Exception {
@MethodSource("parameters")
public void testWatchCompacted(final Client client) throws Exception {
final ByteSequence key = randomByteSequence();
final ByteSequence value = randomByteSequence();

// Insert key twice to ensure we have at least two revisions
client.getKVClient().put(key, value).get();
final PutResponse putResponse = client.getKVClient().put(key, value).get();
// Compact until latest revision
client.getKVClient().compact(putResponse.getHeader().getRevision()).get();

final AtomicReference<Throwable> ref = new AtomicReference<>();
// Try to listen from previous revision on
final WatchOption options = WatchOption.newBuilder().withRevision(putResponse.getHeader().getRevision() - 1).build();
final WatchOption options = WatchOption.newBuilder().withRevision(getCompactedRevision(client, key)).build();
final Watch wc = client.getWatchClient();

try (Watcher watcher = wc.watch(key, options, Watch.listener(TestUtil::noOpWatchResponseConsumer, ref::set))) {
Expand Down Expand Up @@ -212,4 +205,40 @@ public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client clien
assertThat(events.isEmpty()).as("verify that received events list is empty").isTrue();
}
}

private static long getCompactedRevision(final Client client, final ByteSequence key) throws Exception {
final ByteSequence value = randomByteSequence();

// Insert key twice to ensure we have at least two revisions
client.getKVClient().put(key, value).get();
final PutResponse putResponse = client.getKVClient().put(key, value).get();
// Compact until latest revision
client.getKVClient().compact(putResponse.getHeader().getRevision()).get();

return putResponse.getHeader().getRevision() - 1;
}

@ParameterizedTest
@MethodSource("parameters")
public void testCancelledWatchGetsClosed(final Client client) throws Exception {
final ByteSequence key = randomByteSequence();
final Watch wc = client.getWatchClient();

long revision = getCompactedRevision(client, key);
final WatchOption options = WatchOption.newBuilder().withRevision(revision).build();

final AtomicReference<Throwable> ref = new AtomicReference<>();
final AtomicReference<Boolean> completed = new AtomicReference<>();

Watch.Listener listener = Watch.listener(TestUtil::noOpWatchResponseConsumer, ref::set, () -> {
completed.set(Boolean.TRUE);
});

try (Watcher watcher = wc.watch(key, options, listener)) {
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());
assertThat(ref.get().getClass()).isEqualTo(CompactedException.class);
assertThat(completed.get()).isNotNull();
assertThat(completed.get()).isEqualTo(Boolean.TRUE);
}
}
}

0 comments on commit 580f269

Please sign in to comment.