Skip to content

Commit

Permalink
Add requestProgress function to Watch
Browse files Browse the repository at this point in the history
Added a function that requests a watch stream progress status
be sent in the watch response stream as soon as possible. This is
helpful in situations where an application may want to check the
progress of a watch to determine how up-to-date the watch stream is.

Addresses etcd-io#928
  • Loading branch information
juliankung-db committed Jul 29, 2021
1 parent 580f269 commit eef02e7
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 3 deletions.
11 changes: 10 additions & 1 deletion docs/Watch.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ The Watch provide methods to watch on a key interval and cancel a watcher. If th

4. Cancel watch request, the etcd client should process watch cancellation and filter all the notification after cancellation request.

5. The watch client should be able to make a progress notify request that propagates the latest revision number to all watches

# Implementation

The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function) and process cancel with [cancelWatch function](#cancelwatch-function).
The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function), process cancel with [cancelWatch function](#cancelwatch-function) and process progress with [requestProgress function](#requestProgress-function).

## watch function

Expand Down Expand Up @@ -44,6 +46,13 @@ Cancel the watch task with the watcher, the `onCanceled` will be called after su
1. The watcher will be removed from [watchers](#watchers-instance) map.
2. If the [watchers](#watchers-instance) map contain the watcher, it will be moved to [cancelWatchers](#cancelwatchers) and send cancel request to [requestStream](#requeststream-instance).

## requestProgress function

Send the most updated revision number to all active [watchers](#watchers-instance)

1. Send a progress request to [requestStream](#requeststream-instance).
2. Working watchers will receive a WatchResponse containing the latest revision number. All future revision numbers are guaranteed to be greater than or equal to the received revision number.

## requestStream instance

StreamObserver instance
Expand Down
10 changes: 10 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/Watch.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ default Watcher watch(ByteSequence key, WatchOption option, Consumer<WatchRespon
return watch(key, option, listener(onNext, onError, onCompleted));
}

/**
* Requests the latest revision for all watcher instances
*/
void requestProgress();

static Listener listener(Consumer<WatchResponse> onNext) {
return listener(onNext, t -> {
}, () -> {
Expand Down Expand Up @@ -205,5 +210,10 @@ interface Watcher extends Closeable {
*/
@Override
void close();

/**
* Requests the most updated revision and propagates it to listeners
*/
void requestProgress();
}
}
21 changes: 21 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchProgressRequest;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
Expand Down Expand Up @@ -98,6 +99,15 @@ public void close() {
}
}

@Override
public void requestProgress() {
if (!closed.get()) {
synchronized (this.lock) {
watchers.forEach(Watcher::requestProgress);
}
}
}

final class WatcherImpl implements Watcher, StreamObserver<WatchResponse> {
private final ByteSequence key;
private final WatchOption option;
Expand Down Expand Up @@ -192,6 +202,14 @@ public void close() {
}
}

@Override
public void requestProgress() {
if (!closed.get() && stream != null) {
WatchProgressRequest watchProgressRequest = WatchProgressRequest.newBuilder().build();
stream.onNext(WatchRequest.newBuilder().setProgressRequest(watchProgressRequest).build());
}
}

// ************************
//
// StreamObserver
Expand Down Expand Up @@ -245,6 +263,9 @@ public void onNext(WatchResponse response) {
}

handleError(toEtcdException(error), false);
} else if (io.etcd.jetcd.watch.WatchResponse.isProgressNotify(response)) {
listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response));
revision = Math.max(revision, response.getHeader().getRevision());
} else if (response.getEventsCount() == 0 && option.isProgressNotify()) {

//
Expand Down
12 changes: 12 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ public synchronized List<WatchEvent> getEvents() {

return events;
}

public boolean isProgressNotify() {
return isProgressNotify(getResponse());
}

/**
* returns true if the WatchResponse is progress notification.
*/
public static boolean isProgressNotify(io.etcd.jetcd.api.WatchResponse response) {
return response.getEventsCount() == 0 && !response.getCreated() && !response.getCanceled()
&& response.getCompactRevision() == 0 && response.getHeader().getRevision() != 0;
}
}
2 changes: 1 addition & 1 deletion jetcd-core/src/main/proto/kv.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2016-2020 The jetcd authors
// Copyright 2016-2021 The jetcd authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
18 changes: 17 additions & 1 deletion jetcd-core/src/main/proto/rpc.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2016-2020 The jetcd authors
// Copyright 2016-2021 The jetcd authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,13 @@ service KV {
}

service Watch {
// Progress requests that a watch stream progress status
// be sent in the watch response stream as soon as possible.
// For watch progress responses, the header.revision indicates progress. All future events
// received in this stream are guaranteed to have a higher revision number than the
// header.revision number.
rpc Progress(WatchProgressRequest) returns (WatchResponse) {}

// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
Expand Down Expand Up @@ -175,6 +182,9 @@ message ResponseHeader {
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
Expand Down Expand Up @@ -458,6 +468,7 @@ message WatchRequest {
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}

Expand Down Expand Up @@ -497,6 +508,11 @@ message WatchCancelRequest {
int64 watch_id = 1;
}

// Requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
message WatchProgressRequest {
}

message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
Expand Down
53 changes: 53 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,59 @@ public void testWatchClose(final Client client) throws Exception {
assertThat(events.get(0).getEvents().get(0).getKeyValue().getValue()).isEqualTo(value);
}

@ParameterizedTest
@MethodSource("parameters")
public void testProgressRequest(final Client client) throws Exception {
final ByteSequence key = randomByteSequence();
final ByteSequence value = randomByteSequence();
final Watch watchClient = client.getWatchClient();
final AtomicReference<WatchResponse> emptyWatcherEventRef = new AtomicReference<>();
final AtomicReference<WatchResponse> activeWatcherEventRef = new AtomicReference<>();

try (Watcher activeWatcher = watchClient.watch(key, activeWatcherEventRef::set);
Watcher emptyWatcher = watchClient.watch(key.concat(randomByteSequence()), emptyWatcherEventRef::set)) {
// Check that a requestProgress returns identical revisions initially
watchClient.requestProgress();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
assertThat(emptyWatcherEventRef.get()).isNotNull();
});
WatchResponse activeEvent = activeWatcherEventRef.get();
WatchResponse emptyEvent = emptyWatcherEventRef.get();
assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision());

// Put a value being watched by only the active watcher
activeWatcherEventRef.set(null);
emptyWatcherEventRef.set(null);
client.getKVClient().put(key, value).get();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
});
activeEvent = activeWatcherEventRef.get();
emptyEvent = emptyWatcherEventRef.get();
assertThat(emptyEvent).isNull();
assertThat(activeEvent).isNotNull();
long latestRevision = activeEvent.getHeader().getRevision();

// verify the next progress notify brings both watchers to the latest revision
activeWatcherEventRef.set(null);
emptyWatcherEventRef.set(null);
watchClient.requestProgress();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
assertThat(emptyWatcherEventRef.get()).isNotNull();
});
activeEvent = activeWatcherEventRef.get();
emptyEvent = emptyWatcherEventRef.get();
assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision())
.isEqualTo(latestRevision);
}
}

@ParameterizedTest
@MethodSource("parameters")
public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client client) throws Exception {
Expand Down

0 comments on commit eef02e7

Please sign in to comment.