diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java index 958f82761..848ff2654 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java @@ -244,7 +244,7 @@ public void onNext(WatchResponse response) { error = newEtcdException(ErrorCode.FAILED_PRECONDITION, reason); } - listener.onError(error); + handleError(toEtcdException(error), false); } else if (response.getEventsCount() == 0 && option.isProgressNotify()) { // diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java index 8b01b7906..f3e7590c5 100755 --- a/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java @@ -152,17 +152,10 @@ public void testWatchOnDelete(final Client client) throws Exception { @MethodSource("parameters") public void testWatchCompacted(final Client client) throws Exception { final ByteSequence key = randomByteSequence(); - final ByteSequence value = randomByteSequence(); - - // Insert key twice to ensure we have at least two revisions - client.getKVClient().put(key, value).get(); - final PutResponse putResponse = client.getKVClient().put(key, value).get(); - // Compact until latest revision - client.getKVClient().compact(putResponse.getHeader().getRevision()).get(); final AtomicReference ref = new AtomicReference<>(); // Try to listen from previous revision on - final WatchOption options = WatchOption.newBuilder().withRevision(putResponse.getHeader().getRevision() - 1).build(); + final WatchOption options = WatchOption.newBuilder().withRevision(getCompactedRevision(client, key)).build(); final Watch wc = client.getWatchClient(); try (Watcher watcher = wc.watch(key, options, Watch.listener(TestUtil::noOpWatchResponseConsumer, ref::set))) { @@ -212,4 +205,40 @@ public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client clien assertThat(events.isEmpty()).as("verify that received events list is empty").isTrue(); } } + + private static long getCompactedRevision(final Client client, final ByteSequence key) throws Exception { + final ByteSequence value = randomByteSequence(); + + // Insert key twice to ensure we have at least two revisions + client.getKVClient().put(key, value).get(); + final PutResponse putResponse = client.getKVClient().put(key, value).get(); + // Compact until latest revision + client.getKVClient().compact(putResponse.getHeader().getRevision()).get(); + + return putResponse.getHeader().getRevision() - 1; + } + + @ParameterizedTest + @MethodSource("parameters") + public void testCancelledWatchGetsClosed(final Client client) throws Exception { + final ByteSequence key = randomByteSequence(); + final Watch wc = client.getWatchClient(); + + long revision = getCompactedRevision(client, key); + final WatchOption options = WatchOption.newBuilder().withRevision(revision).build(); + + final AtomicReference ref = new AtomicReference<>(); + final AtomicReference completed = new AtomicReference<>(); + + Watch.Listener listener = Watch.listener(TestUtil::noOpWatchResponseConsumer, ref::set, () -> { + completed.set(Boolean.TRUE); + }); + + try (Watcher watcher = wc.watch(key, options, listener)) { + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull()); + assertThat(ref.get().getClass()).isEqualTo(CompactedException.class); + assertThat(completed.get()).isNotNull(); + assertThat(completed.get()).isEqualTo(Boolean.TRUE); + } + } }