Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the robustness of old ZooKeeper log removal #1040

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
Expand Down Expand Up @@ -248,38 +247,78 @@ private void deleteLogs() throws Exception {

final long minAllowedTimestamp = System.currentTimeMillis() - cfg.minLogAgeMillis();
final int targetCount = children.size() - cfg.maxLogCount();
final List<String> deleted = new ArrayList<>(targetCount);
final List<String> deletedPaths = new ArrayList<>(targetCount);
children.sort(Comparator.comparingLong(Long::parseLong));
try {
for (int i = 0; i < targetCount; ++i) {
final String logPath = absolutePath(LOG_PATH, children.get(i));
final LogMeta meta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
final String childName = children.get(i);
final String logPath = absolutePath(LOG_PATH, childName);
final LogMeta logMeta = readLogMeta(logPath);
if (logMeta == null) {
continue;
}

if (meta.timestamp() >= minAllowedTimestamp) {
if (logMeta.timestamp() >= minAllowedTimestamp) {
// Do not delete the logs that are not old enough.
// We can break the loop here because the 'children' has been sorted by
// insertion order (sequence value).
break;
}

deleteLog(curator.transactionOp().delete().forPath(logPath), deleted, children.get(i));
for (long blockId : meta.blocks()) {
// Delete the log blocks first, so that we never have dangling log blocks.
for (long blockId : logMeta.blocks()) {
final String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId);
deleteLog(curator.transactionOp().delete().forPath(blockPath),
deleted, children.get(i));
deleteLogBlock(logPath, logMeta, blockPath, deletedPaths);
}
deleteLog(logPath, logMeta, deletedPaths);
}
} finally {
logger.info("delete logs: {}", deleted);
logger.info("Deleted ZooKeeper nodes: {}", deletedPaths);
}
}

private void deleteLog(CuratorOp curatorOp, List<String> deleted, String childName) {
@Nullable
private LogMeta readLogMeta(String logPath) throws Exception {
try {
curator.transaction().forOperations(curatorOp);
deleted.add(childName);
} catch (Throwable t) {
logger.warn("Failed to delete ZooKeeper log: {}", childName, t);
return Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NONODE) {
logger.warn("Attempted to read a missing log from ZooKeeper; " +
"maybe deleted already? logPath: {}", logPath, e);
return null;
} else {
throw e;
}
}
}

private void deleteLog(String logPath, LogMeta logMeta, List<String> deletedPaths) throws Exception {
try {
curator.delete().forPath(logPath);
deletedPaths.add(logPath);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NONODE) {
logger.warn("Attempted to delete a missing log from ZooKeeper; " +
"maybe deleted already? logPath: {}, logMeta: {}", logPath, logMeta, e);
} else {
throw e;
}
}
}

private void deleteLogBlock(String logPath, LogMeta logMeta, String blockPath,
List<String> deletedPaths) throws Exception {
try {
curator.delete().forPath(blockPath);
deletedPaths.add(blockPath);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NONODE) {
logger.warn("Attempted to delete a missing log block from ZooKeeper; " +
"maybe deleted already? blockPath: {}, logPath: {}, logMeta: {}",
blockPath, logPath, logMeta, e);
} else {
throw e;
}
}
}
}
Expand Down
Loading