From 157cf384104d201dc29b21686066be7ffaccda2d Mon Sep 17 00:00:00 2001 From: Julian Kung Date: Wed, 28 Jul 2021 08:46:53 -0700 Subject: [PATCH] Add requestProgress function to Watch Added a function that requests a watch stream progress status be sent in the watch response stream as soon as possible. This is helpful in situations where an application may want to check the progress of a watch to determine how up-to-date the watch stream is. Addresses #928 --- docs/Watch.md | 11 +++- .../src/main/java/io/etcd/jetcd/Watch.java | 10 ++++ .../main/java/io/etcd/jetcd/WatchImpl.java | 21 ++++++++ .../io/etcd/jetcd/watch/WatchResponse.java | 12 +++++ jetcd-core/src/main/proto/kv.proto | 2 +- jetcd-core/src/main/proto/rpc.proto | 18 ++++++- .../test/java/io/etcd/jetcd/WatchTest.java | 53 +++++++++++++++++++ 7 files changed, 124 insertions(+), 3 deletions(-) diff --git a/docs/Watch.md b/docs/Watch.md index 30fa4a744..1ee03bfd7 100644 --- a/docs/Watch.md +++ b/docs/Watch.md @@ -12,9 +12,11 @@ The Watch provide methods to watch on a key interval and cancel a watcher. If th 4. Cancel watch request, the etcd client should process watch cancellation and filter all the notification after cancellation request. +5. The watch client should be able to make a progress notify request that propagates the latest revision number to all watches + # Implementation -The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function) and process cancel with [cancelWatch function](#cancelwatch-function). +The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function), process cancel with [cancelWatch function](#cancelwatch-function) and request progress with [requestProgress function](#requestProgress-function). ## watch function @@ -44,6 +46,13 @@ Cancel the watch task with the watcher, the `onCanceled` will be called after su 1. The watcher will be removed from [watchers](#watchers-instance) map. 2. If the [watchers](#watchers-instance) map contain the watcher, it will be moved to [cancelWatchers](#cancelwatchers) and send cancel request to [requestStream](#requeststream-instance). +## requestProgress function + +Send the latest revision processed to all active [watchers](#watchers-instance) + +1. Send a progress request to [requestStream](#requeststream-instance). +2. Working watchers will receive a WatchResponse containing the latest revision number. All future revision numbers are guaranteed to be greater than or equal to the received revision number. + ## requestStream instance StreamObserver instance diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/Watch.java b/jetcd-core/src/main/java/io/etcd/jetcd/Watch.java index 1909702e1..f59aa5f8e 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/Watch.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/Watch.java @@ -140,6 +140,11 @@ default Watcher watch(ByteSequence key, WatchOption option, Consumer onNext) { return listener(onNext, t -> { }, () -> { @@ -205,5 +210,10 @@ interface Watcher extends Closeable { */ @Override void close(); + + /** + * Requests the latest revision processed and propagates it to listeners + */ + void requestProgress(); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java index 848ff2654..ba9529f1f 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java @@ -30,6 +30,7 @@ import io.etcd.jetcd.api.WatchCancelRequest; import io.etcd.jetcd.api.WatchCreateRequest; import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchProgressRequest; import io.etcd.jetcd.api.WatchRequest; import io.etcd.jetcd.api.WatchResponse; import io.etcd.jetcd.common.exception.ErrorCode; @@ -98,6 +99,15 @@ public void close() { } } + @Override + public void requestProgress() { + if (!closed.get()) { + synchronized (this.lock) { + watchers.forEach(Watcher::requestProgress); + } + } + } + final class WatcherImpl implements Watcher, StreamObserver { private final ByteSequence key; private final WatchOption option; @@ -192,6 +202,14 @@ public void close() { } } + @Override + public void requestProgress() { + if (!closed.get() && stream != null) { + WatchProgressRequest watchProgressRequest = WatchProgressRequest.newBuilder().build(); + stream.onNext(WatchRequest.newBuilder().setProgressRequest(watchProgressRequest).build()); + } + } + // ************************ // // StreamObserver @@ -245,6 +263,9 @@ public void onNext(WatchResponse response) { } handleError(toEtcdException(error), false); + } else if (io.etcd.jetcd.watch.WatchResponse.isProgressNotify(response)) { + listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response)); + revision = Math.max(revision, response.getHeader().getRevision()); } else if (response.getEventsCount() == 0 && option.isProgressNotify()) { // diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java b/jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java index 939816b05..69c92f31a 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java @@ -67,4 +67,16 @@ public synchronized List getEvents() { return events; } + + public boolean isProgressNotify() { + return isProgressNotify(getResponse()); + } + + /** + * returns true if the WatchResponse is progress notification. + */ + public static boolean isProgressNotify(io.etcd.jetcd.api.WatchResponse response) { + return response.getEventsCount() == 0 && !response.getCreated() && !response.getCanceled() + && response.getCompactRevision() == 0 && response.getHeader().getRevision() != 0; + } } diff --git a/jetcd-core/src/main/proto/kv.proto b/jetcd-core/src/main/proto/kv.proto index 334dc4ecb..c8135fa84 100644 --- a/jetcd-core/src/main/proto/kv.proto +++ b/jetcd-core/src/main/proto/kv.proto @@ -1,5 +1,5 @@ // -// Copyright 2016-2020 The jetcd authors +// Copyright 2016-2021 The jetcd authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/jetcd-core/src/main/proto/rpc.proto b/jetcd-core/src/main/proto/rpc.proto index b38e423e9..ec187390a 100644 --- a/jetcd-core/src/main/proto/rpc.proto +++ b/jetcd-core/src/main/proto/rpc.proto @@ -1,5 +1,5 @@ // -// Copyright 2016-2020 The jetcd authors +// Copyright 2016-2021 The jetcd authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -53,6 +53,13 @@ service KV { } service Watch { + // Progress requests that a watch stream progress status + // be sent in the watch response stream as soon as possible. + // For watch progress responses, the header.revision indicates progress. All future events + // received in this stream are guaranteed to have a higher revision number than the + // header.revision number. + rpc Progress(WatchProgressRequest) returns (WatchResponse) {} + // Watch watches for events happening or that have happened. Both input and output // are streams; the input stream is for creating and canceling watchers and the output // stream sends events. One watch RPC can watch on multiple key ranges, streaming events @@ -175,6 +182,9 @@ message ResponseHeader { // member_id is the ID of the member which sent the response. uint64 member_id = 2; // revision is the key-value store revision when the request was applied. + // For watch progress responses, the header.revision indicates progress. All future events + // recieved in this stream are guaranteed to have a higher revision number than the + // header.revision number. int64 revision = 3; // raft_term is the raft term when the request was applied. uint64 raft_term = 4; @@ -458,6 +468,7 @@ message WatchRequest { oneof request_union { WatchCreateRequest create_request = 1; WatchCancelRequest cancel_request = 2; + WatchProgressRequest progress_request = 3; } } @@ -497,6 +508,11 @@ message WatchCancelRequest { int64 watch_id = 1; } +// Requests a watch stream progress status be sent in the +// watch response stream as soon as possible. +message WatchProgressRequest { +} + message WatchResponse { ResponseHeader header = 1; // watch_id is the ID of the watcher that corresponds to the response. diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java index f3e7590c5..873eac77f 100755 --- a/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java @@ -185,6 +185,59 @@ public void testWatchClose(final Client client) throws Exception { assertThat(events.get(0).getEvents().get(0).getKeyValue().getValue()).isEqualTo(value); } + @ParameterizedTest + @MethodSource("parameters") + public void testProgressRequest(final Client client) throws Exception { + final ByteSequence key = randomByteSequence(); + final ByteSequence value = randomByteSequence(); + final Watch watchClient = client.getWatchClient(); + final AtomicReference emptyWatcherEventRef = new AtomicReference<>(); + final AtomicReference activeWatcherEventRef = new AtomicReference<>(); + + try (Watcher activeWatcher = watchClient.watch(key, activeWatcherEventRef::set); + Watcher emptyWatcher = watchClient.watch(key.concat(randomByteSequence()), emptyWatcherEventRef::set)) { + // Check that a requestProgress returns identical revisions initially + watchClient.requestProgress(); + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(activeWatcherEventRef.get()).isNotNull(); + assertThat(emptyWatcherEventRef.get()).isNotNull(); + }); + WatchResponse activeEvent = activeWatcherEventRef.get(); + WatchResponse emptyEvent = emptyWatcherEventRef.get(); + assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify); + assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify); + assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision()); + + // Put a value being watched by only the active watcher + activeWatcherEventRef.set(null); + emptyWatcherEventRef.set(null); + client.getKVClient().put(key, value).get(); + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(activeWatcherEventRef.get()).isNotNull(); + }); + activeEvent = activeWatcherEventRef.get(); + emptyEvent = emptyWatcherEventRef.get(); + assertThat(emptyEvent).isNull(); + assertThat(activeEvent).isNotNull(); + long latestRevision = activeEvent.getHeader().getRevision(); + + // verify the next progress notify brings both watchers to the latest revision + activeWatcherEventRef.set(null); + emptyWatcherEventRef.set(null); + watchClient.requestProgress(); + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(activeWatcherEventRef.get()).isNotNull(); + assertThat(emptyWatcherEventRef.get()).isNotNull(); + }); + activeEvent = activeWatcherEventRef.get(); + emptyEvent = emptyWatcherEventRef.get(); + assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify); + assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify); + assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision()) + .isEqualTo(latestRevision); + } + } + @ParameterizedTest @MethodSource("parameters") public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client client) throws Exception {