Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths #18518

Merged
merged 1 commit into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ protected CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
}

protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
return store.sync(path).thenCompose(___ -> {
cache.invalidate(path);
return cache.get(path);
});
}

protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,18 @@ public CompletableFuture<List<String>> listPartitionedTopicsAsync(NamespaceName
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
return getPartitionedTopicMetadataAsync(tn, false);
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn,
boolean refresh) {
if (refresh) {
return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
} else {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
}

public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
Expand Down Expand Up @@ -317,7 +327,7 @@ public CompletableFuture<Boolean> isPartitionedTopicBeingDeletedAsync(TopicName
if (tn.isPartitioned()) {
tn = TopicName.get(tn.getPartitionedTopicName());
}
return getPartitionedTopicMetadataAsync(tn)
return getPartitionedTopicMetadataAsync(tn, true)
.thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted)
.orElse(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<Optional<GetResult>> get(String path);


/**
* Ensure that the next value read from the local client will be up-to-date with the latest version of the value
* as it can be seen by all the other clients.
* @param path
* @return a handle to the operation
*/
default CompletableFuture<Void> sync(String path) {
return CompletableFuture.completedFuture(null);
}

/**
* Return all the nodes (lexicographically sorted) that are children to the specific path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

@Override
public CompletableFuture<Void> sync(String path) {
CompletableFuture<Void> result = new CompletableFuture<>();
zkc.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object o) {
Code code = Code.get(rc);
if (code == Code.OK) {
result.complete(null);
} else {
MetadataStoreException e = getException(code, path);
result.completeExceptionally(e);
}
}
}, null);
return result;
}

@Override
protected void batchOperation(List<MetadataOp> ops) {
try {
Expand Down