Skip to content

Commit

Permalink
Communicate the Zxid that triggered a WatchEvent to fire
Browse files Browse the repository at this point in the history
With the recent addition of persistent watches, many doors have opened up to significantly more
performant and intuitive local caches of remote state, but the actual implementation can be
difficult because to cache data locally, one needs to execute the following steps:

1. Set the watch
2. Bootstrap the watched subtree
3. Catch up on the events that fired during the bootstrap

The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step
3 because it's unknown whether an event arrived during the bootstrap or after. For example,
imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is
executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a
double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies
in the local cache.

This change sets the Zxid in the response header whenever the watch event type is NodeCreated,
NodeDeleted, NodeDataChanged or NodeChildrenChanged.
  • Loading branch information
PapaCharlie committed Mar 8, 2023
1 parent 84afb8c commit 13bd711
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ Persistent watches are set using the method *addWatch()*. The triggering semanti
(other than one-time triggering) are the same as standard watches. The only exception regarding events is that
recursive persistent watchers never trigger child changed events as they are redundant.
Persistent watches are removed using *removeWatches()* with watcher type *WatcherType.Any*.
<a name="sc_WatchRemoval"></a>
### Remove Watches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void tearDown() {

// clear all the watches
for (String path : paths) {
watchManager.triggerWatch(path, event);
watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event);
WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,38 @@
*/
@InterfaceAudience.Public
public class WatchedEvent {
public static final long NO_ZXID = -1L;

private final KeeperState keeperState;
private final EventType eventType;
private String path;
private final String path;
private final long zxid;

/**
* Create a WatchedEvent with specified type, state and path
* Create a WatchedEvent with specified type, state, path and zxid
*/
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
this.zxid = zxid;
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
* Create a WatchedEvent with specified type, state and path
*/
public WatchedEvent(WatcherEvent eventMessage) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this(eventType, keeperState, path, NO_ZXID);
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
*/
public WatchedEvent(WatcherEvent eventMessage, long zxid) {
keeperState = KeeperState.fromInt(eventMessage.getState());
eventType = EventType.fromInt(eventMessage.getType());
path = eventMessage.getPath();
this.zxid = zxid;
}

public KeeperState getState() {
Expand All @@ -66,9 +77,24 @@ public String getPath() {
return path;
}

/**
* Returns the zxid of the transaction that triggered this watch if it is
* of one of the following types:<ul>
* <li>{@link EventType#NodeCreated}</li>
* <li>{@link EventType#NodeDeleted}</li>
* <li>{@link EventType#NodeDataChanged}</li>
* <li>{@link EventType#NodeChildrenChanged}</li>
* </ul>
* Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also
* returned by old servers that do not support this feature.
*/
public long getZxid() {
return zxid;
}

@Override
public String toString() {
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
}

/**
Expand All @@ -77,5 +103,4 @@ public String toString() {
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
* When {@link #process} is triggered by a watch firing, such as
* {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will
* return the zxid of the transaction that caused said watch to fire. If
* {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to
* support this feature.
*
*/
@InterfaceAudience.Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

/**
Expand Down Expand Up @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Expand Down Expand Up @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
public class DumbWatcher extends ServerCnxn {

private long sessionId;
private String mostRecentPath;
private Event.EventType mostRecentEventType;
private long mostRecentZxid = WatchedEvent.NO_ZXID;

public DumbWatcher() {
this(0);
Expand All @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) {

@Override
public void process(WatchedEvent event) {
mostRecentEventType = event.getType();
mostRecentZxid = event.getZxid();
mostRecentPath = event.getPath();
}

public String getMostRecentPath() {
return mostRecentPath;
}

public Event.EventType getMostRecentEventType() {
return mostRecentEventType;
}

public long getMostRecentZxid() {
return mostRecentZxid;
}


@Override
int getSessionTimeout() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
*/
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public int getSessionTimeout() {

@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,24 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);

/**
* Distribute the watch event for the given path, but ignore those
* suppressed ones.
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
* @param suppress the suppressed watcher set
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);

BitHashSet watchers = remove(path);
if (watchers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public WatcherTriggerWorker(
public void run() {
while (!stopped) {
String path = PATH_PREFIX + r.nextInt(paths);
WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted);
WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1);
if (s != null) {
triggeredCount.addAndGet(s.size());
}
Expand Down Expand Up @@ -416,6 +416,12 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon
assertEquals(sum, values.get("sum_" + metricName));
}

private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) {
assertEquals(path, watcher.getMostRecentPath());
assertEquals(eventType, watcher.getMostRecentEventType());
assertEquals(zxid, watcher.getMostRecentZxid());
}

@ParameterizedTest
@MethodSource("data")
public void testWatcherMetrics(String className) throws IOException {
Expand All @@ -430,41 +436,54 @@ public void testWatcherMetrics(String className) throws IOException {

final String path3 = "/path3";

//both wather1 and wather2 are watching path1
//both watcher1 and watcher2 are watching path1
manager.addWatch(path1, watcher1);
manager.addWatch(path1, watcher2);

//path2 is watched by watcher1
manager.addWatch(path2, watcher1);

manager.triggerWatch(path3, EventType.NodeCreated);
manager.triggerWatch(path3, EventType.NodeCreated, 1);
//path3 is not being watched so metric is 0
checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
// Watchers shouldn't have received any events yet so the zxid should be -1.
checkMostRecentWatchedEvent(watcher1, null, null, -1);
checkMostRecentWatchedEvent(watcher2, null, null, -1);

//path1 is watched by two watchers so two fired
manager.triggerWatch(path1, EventType.NodeCreated);
manager.triggerWatch(path1, EventType.NodeCreated, 2);
checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2);
checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

//path2 is watched by one watcher so one fired now total is 3
manager.triggerWatch(path2, EventType.NodeCreated);
manager.triggerWatch(path2, EventType.NodeCreated, 3);
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

//watches on path1 are no longer there so zero fired
manager.triggerWatch(path1, EventType.NodeDataChanged);
manager.triggerWatch(path1, EventType.NodeDataChanged, 4);
checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

//both wather1 and wather2 are watching path1
//both watcher and watcher are watching path1
manager.addWatch(path1, watcher1);
manager.addWatch(path1, watcher2);

//path2 is watched by watcher1
manager.addWatch(path2, watcher1);

manager.triggerWatch(path1, EventType.NodeDataChanged);
manager.triggerWatch(path1, EventType.NodeDataChanged, 5);
checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5);
checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);

manager.triggerWatch(path2, EventType.NodeDeleted);
manager.triggerWatch(path2, EventType.NodeDeleted, 6);
checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6);
checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);

//make sure that node created watch count is not impacted by the fire of other event types
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
Expand Down
Loading

0 comments on commit 13bd711

Please sign in to comment.