Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Change remote purge threadpool to fixed instead of scaling to limit i… #12247

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1312,21 +1312,25 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List<Strin
* @param committedManifest last committed ClusterMetadataManifest
*/
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
} catch (IOException e) {
logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
return;
}
// Retain last 2 cluster uuids data
allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
});
try {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
} catch (IOException e) {
logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
return;
}
// Retain last 2 cluster uuids data
allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
});
} catch (Exception e) {
logger.error("Exception occurred while scheduling deletion of stale cluster UUIDs from remote store", e);
}
}

public RemotePersistenceStats getStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,29 +408,33 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep);
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Set<Long> primaryTermsInRemote = folders.stream().filter(folderName -> {
try {
Long.parseLong(folderName);
return true;
} catch (Exception ignored) {
// NO-OP
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
}
try {
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Set<Long> primaryTermsInRemote = folders.stream().filter(folderName -> {
try {
Long.parseLong(folderName);
return true;
} catch (Exception ignored) {
// NO-OP
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
}
});
@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
}
});
} catch (Exception e) {
logger.error("Exception occurred while scheduling listing primary terms from remote store", e);
}
Comment on lines +435 to +437
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't right, other issue could be masked by blindly try/catching exceptions. Why isn't the action listener's onFailure called?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The action listener comes into play only after the list call is successful. In this case the list itself fails.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK maybe onFailure isn't the best signal - what would be a better signal? Exceptions are mysterious (undocumented) and expensive.

}

/**
Expand All @@ -457,18 +461,22 @@ public void onFailure(Exception e) {
}

public void delete() {
// cleans up all the translog contents in async fashion
transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted all remote translog data");
}
// cleans up all the translog contents in async fashion in a best effort way
try {
transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted all remote translog data");
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while cleaning translog", e);
}
});
@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while cleaning translog", e);
}
});
} catch (Exception e) {
logger.error("Exception occurred while scheduling delete from remote store", e);
}
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
Expand Down Expand Up @@ -547,7 +555,14 @@ public void onFailure(Exception e) {
);
} catch (Exception e) {
onCompletion.run();
throw e;
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED);
map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING);
map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED);
map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING);
map.put(Names.REMOTE_PURGE, ThreadPoolType.FIXED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this creating overhead that will be unused on some cluster configurations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the overhead is quite minimal , given the size of the pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand how this pool is used? I don't have a sense for what 'minimal' means as some customers operate OpenSearch on very resource constrained systems such as EC2's T3 instance types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pool is used for async deletions related to remote store .

I don't have a sense for what 'minimal' means as some customers operate OpenSearch on very resource constrained systems such as EC2's T3 instance types.

Actually on a domain with no remote store, there is not going to be any overhead . Fixed Size threadpool creates SizeBlockingQueue with some fixed size . With no remote purges happening , it will not create any overhead as the queue will not have any items in the first place.

For remote store enabled clusters, this will help limit the memory impact of this queue by limiting its size.

map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING);
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
Expand Down Expand Up @@ -265,7 +265,7 @@ public ThreadPool(
new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProc, TimeValue.timeValueMinutes(5))
);
builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000));
builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProc, TimeValue.timeValueMinutes(5)));
builders.put(Names.REMOTE_PURGE, new FixedExecutorBuilder(settings, Names.REMOTE_PURGE, halfProc, 10000));
builders.put(
Names.REMOTE_REFRESH_RETRY,
new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProc, TimeValue.timeValueMinutes(5))
Expand Down
Loading