Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复MetaDataSyncer获取Partition的incVersion时可能存在的npe问题 #441

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.havenask.engine.HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
Expand All @@ -28,6 +29,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -268,7 +270,7 @@ protected void runInternal() {
);
}
} catch (Throwable e) {
LOGGER.error("update searcher heartbeat target failed", e);
LOGGER.info("update searcher heartbeat target failed", e);
}

searcherSynced.set(false);
Expand Down Expand Up @@ -317,7 +319,7 @@ protected void runInternal() {
}

} catch (Throwable e) {
LOGGER.error("update qrs heartbeat target failed, ", e);
LOGGER.info("update qrs heartbeat target failed, ", e);
}

qrsSynced.set(false);
Expand Down Expand Up @@ -490,14 +492,26 @@ public UpdateHeartbeatTargetRequest createSearcherUpdateHeartbeatTargetRequest(C
{
Path versionPath = defaultRuntimeDataPath.resolve(index).resolve(INDEX_SUB_PATH0);
TargetInfo.TableInfo.Partition curPartition = new TargetInfo.TableInfo.Partition();
curPartition.inc_version = extractIncVersion(Utils.getIndexMaxVersion(versionPath));
Long inc_version = Utils.getIndexMaxVersionNum(versionPath);
if (inc_version == -1L) {
throw new FileNotFoundException(
String.format(Locale.ROOT, "inc_version does not exist under directory [%s]", versionPath)
);
}
curPartition.inc_version = inc_version;
partitions.put(DEFAULT_PARTITION_NAME0, curPartition);
}

{
Path versionPath = defaultRuntimeDataPath.resolve(index).resolve(INDEX_SUB_PATH1);
TargetInfo.TableInfo.Partition curPartition = new TargetInfo.TableInfo.Partition();
curPartition.inc_version = extractIncVersion(Utils.getIndexMaxVersion(versionPath));
Long inc_version = Utils.getIndexMaxVersionNum(versionPath);
if (inc_version == -1L) {
throw new FileNotFoundException(
String.format(Locale.ROOT, "inc_version does not exist under directory [%s]", versionPath)
);
}
curPartition.inc_version = inc_version;
partitions.put(DEFAULT_PARTITION_NAME1, curPartition);
}

Expand All @@ -513,7 +527,17 @@ public UpdateHeartbeatTargetRequest createSearcherUpdateHeartbeatTargetRequest(C
String partitionId = RangeUtil.getRangeName(totalPartitionCount, shardId);
TargetInfo.TableInfo.Partition curPartition = new TargetInfo.TableInfo.Partition();
Path versionPath = defaultRuntimeDataPath.resolve(index).resolve("generation_0").resolve(partitionName);
curPartition.inc_version = extractIncVersion(Utils.getIndexMaxVersion(versionPath));
try {
Long inc_version = Utils.getIndexMaxVersionNum(versionPath);
if (inc_version == -1L) {
throw new FileNotFoundException(
String.format(Locale.ROOT, "inc_version does not exist under directory [%s]", versionPath)
);
}
curPartition.inc_version = inc_version;
} catch (IOException e) {
throw new RuntimeException(e);
}
partitions.put(partitionId, curPartition);
});
curTableInfo = new TargetInfo.TableInfo(tableMode, tableType, configPath, indexRoot, totalPartitionCount, partitions);
Expand All @@ -530,19 +554,6 @@ public UpdateHeartbeatTargetRequest createSearcherUpdateHeartbeatTargetRequest(C
return new UpdateHeartbeatTargetRequest(searcherTargetInfo);
}

private static int extractIncVersion(String versionStr) {
String pattern = "version\\.(\\d+)";
Pattern regex = Pattern.compile(pattern);
Matcher matcher = regex.matcher(versionStr);
if (matcher.find()) {
String numberStr = matcher.group(1);
int number = Integer.parseInt(numberStr);
return number;
} else {
return -1;
}
}

private static void createConfigLink(String zoneName, String prefix, String bizName, Path configPath, Path dataPath)
throws IOException {
final String zoneConfig = "zone_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
Map<String, StoreFileMetadata> getHavenaskMetadata(IndexCommit commit) throws IOException {
long commitVersion = 0;
if (commit == null) {
String maxIndexVersionFile = Utils.getIndexMaxVersion(shardPath);
if (maxIndexVersionFile != null) {
commitVersion = Long.parseLong(maxIndexVersionFile.substring(maxIndexVersionFile.indexOf('.') + 1));
try {
Long maxIndexVersionFileNum = Utils.getIndexMaxVersionNum(shardPath);
if (maxIndexVersionFileNum != -1L) {
commitVersion = maxIndexVersionFileNum;
}
} catch (IOException e) {
// ignore
}
} else if (commit.getUserData().containsKey(HavenaskCommitInfo.COMMIT_VERSION_KEY)) {
commitVersion = Long.valueOf(commit.getUserData().get(HavenaskCommitInfo.COMMIT_VERSION_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public int hashCode() {
public String check_index_path;
public Integer deploy_status;
public List<List<Object>> deploy_status_map;
public Integer inc_version;
public Long inc_version;
public Integer keep_count;
public String loaded_config_path;
public String loaded_index_root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ public static String getIndexMaxVersion(Path versionFilePath) {
}
}

/**
* return the max version file's version number under the certain index directory
*/
public static long getIndexMaxVersionNum(Path versionFilePath) throws IOException {
try (Stream<Path> stream = Files.list(versionFilePath)) {
long maxVersion = stream.map(path -> path.getFileName().toString())
.filter(s -> s.matches("version\\.\\d+"))
.map(s -> Long.parseLong(s.substring(s.indexOf('.') + 1)))
.max(Long::compare)
.orElse(-1L);
return maxVersion;
} catch (IOException e) {
logger.info("directory [{}] does not exist or the version num is too big", versionFilePath);
throw e;
}
}

/**
* return the loactor in the version file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,21 @@ public void testCleanVersionPublishFiles() {
logger.error("list directory [{}] error", dirPath);
}
}

public void testGetIndexMaxVersionNum() throws Exception {
Path home = createTempDir();
Files.createFile(home.resolve("version.1"));
Files.createFile(home.resolve("version.10"));
Files.createFile(home.resolve("version.11"));
long incVersion = Utils.getIndexMaxVersionNum(home);
assertEquals(11L, incVersion);

Path illegalPathVersionDir = home.resolve("illegalPathVersionDir");
Files.createDirectory(illegalPathVersionDir);
Files.createFile(illegalPathVersionDir.resolve("illegalversion.1"));
Files.createFile(illegalPathVersionDir.resolve("illegalversion.2"));
Files.createFile(illegalPathVersionDir.resolve("illegalversion.3"));
long illegalIncVersion = Utils.getIndexMaxVersionNum(illegalPathVersionDir);
assertEquals(-1L, illegalIncVersion);
}
}
Loading