Skip to content

Commit

Permalink
Refactor read only chunk timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 12, 2024
1 parent a5a3209 commit 75c65b6
Showing 1 changed file with 7 additions and 22 deletions.
29 changes: 7 additions & 22 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.slack.kaldb.chunk;

import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.annotations.VisibleForTesting;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.logstore.search.LogIndexSearcher;
Expand Down Expand Up @@ -48,10 +50,6 @@
public class ReadOnlyChunkImpl<T> implements Chunk<T> {

private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class);

@Deprecated // replace with sync methods, which use DEFAULT_ZK_TIMEOUT_SECS where possible
private static final int TIMEOUT_MS = 5000;

private ChunkInfo chunkInfo;
private LogIndexSearcher<T> logSearcher;
private SearchMetadata searchMetadata;
Expand Down Expand Up @@ -174,20 +172,14 @@ public static SearchMetadata registerSearchMetadata(
snapshotName, cacheSearchContext.hostname),
snapshotName,
cacheSearchContext.toUrl());
searchMetadataStore
.createAsync(metadata)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
searchMetadataStore.createSync(metadata);
return metadata;
}

private void unregisterSearchMetadata()
throws ExecutionException, InterruptedException, TimeoutException {
if (this.searchMetadata != null) {
searchMetadataStore
.deleteAsync(searchMetadata)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
searchMetadataStore.deleteSync(searchMetadata);
}
}

Expand Down Expand Up @@ -265,15 +257,8 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {

private SnapshotMetadata getSnapshotMetadata(String replicaId)
throws ExecutionException, InterruptedException, TimeoutException {
ReplicaMetadata replicaMetadata =
replicaMetadataStore
.findAsync(replicaId)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
return snapshotMetadataStore
.findAsync(replicaMetadata.snapshotId)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ReplicaMetadata replicaMetadata = replicaMetadataStore.findSync(replicaId);
return snapshotMetadataStore.findSync(replicaMetadata.snapshotId);
}

// We lock access when manipulating the chunk, as the close()
Expand Down Expand Up @@ -319,7 +304,7 @@ private boolean setChunkMetadataState(
try {
cacheSlotMetadataStore
.updateNonFreeCacheSlotState(cacheSlotMetadata, newState)
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Error setting chunk metadata state", e);
Expand Down

0 comments on commit 75c65b6

Please sign in to comment.