Skip to content

Commit

Permalink
Merge branch 'main' into feature/support-get-from-searcher
Browse files Browse the repository at this point in the history
* main:
  修复反复创建删除相同索引时没有正确加锁的问题 (alibaba#357)
  修复havenask config配置schema异常问题 (alibaba#363)
  sleep抛出的异常时,直接抛出方法能处理的异常 (alibaba#360)
  • Loading branch information
weizijun committed Dec 13, 2023
2 parents 4226c67 + 291d4f2 commit bc69624
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,34 @@

package org.havenask.engine;

import static org.havenask.engine.index.config.generator.BizConfigGenerator.BIZ_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.CLUSTER_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DATA_TABLES_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_BIZ_CONFIG;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.PLUGINS_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.SCHEMAS_DIR;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_CONTENT_TEMPLATE;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.SCHEMA_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_NAME;
import static org.havenask.engine.index.config.generator.TableConfigGenerator.TABLE_DIR;
import static org.havenask.env.Environment.PATH_HOME_SETTING;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
Expand All @@ -46,27 +66,6 @@
import org.havenask.plugins.NodeEnvironmentPlugin.CustomEnvironment;
import org.havenask.threadpool.ThreadPool;

import static org.havenask.engine.index.config.generator.BizConfigGenerator.BIZ_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.CLUSTER_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DATA_TABLES_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_BIZ_CONFIG;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.PLUGINS_DIR;
import static org.havenask.engine.index.config.generator.BizConfigGenerator.SCHEMAS_DIR;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_CONTENT_TEMPLATE;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.SCHEMA_FILE_NAME;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_CONTENT;
import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_NAME;
import static org.havenask.engine.index.config.generator.TableConfigGenerator.TABLE_DIR;
import static org.havenask.env.Environment.PATH_HOME_SETTING;

public class HavenaskEngineEnvironment implements CustomEnvironment {
private static final Logger LOGGER = LogManager.getLogger(HavenaskEngineEnvironment.class);
public static final String DEFAULT_DATA_PATH = "havenask";
Expand Down Expand Up @@ -212,6 +211,8 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
if (metaDataSyncer == null) {
throw new RuntimeException("metaDataSyncer is null while deleting index");
}
metaDataSyncer.addIndexLock(tableName);
LOGGER.debug("get lock while deleting index, table name :[{}]", tableName);
// TODO: ThreadPool的获取是否可以优化
final ThreadPool threadPool = metaDataSyncer.getThreadPool();
asyncRemoveIndexDir(threadPool, tableName, indexDir);
Expand All @@ -225,20 +226,18 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
if (metaDataSyncer == null) {
throw new RuntimeException("metaDataSyncer is null while deleting shard");
}

String tableName = indexSettings.getIndex().getName();
metaDataSyncer.addShardLock(lock.getShardId());
LOGGER.debug("get lock while deleting shard, table name :[{}]", lock.getShardId().getIndexName());
String partitionId = RangeUtil.getRangeName(indexSettings.getNumberOfShards(), lock.getShardId().id());
final ThreadPool threadPool = metaDataSyncer.getThreadPool();
asyncRemoveShardRuntimeDir(threadPool, tableName, partitionId, shardDir);
asyncRemoveShardRuntimeDir(threadPool, lock.getShardId(), partitionId, shardDir);
}

/**
* 异步移除删除索引后runtimedata的数据信息
*/
public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, Path indexDir) {
threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> {
ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName);
indexLock.lock();
LOGGER.debug("get lock while deleting index, table name :[{}]", tableName);
try {
if (metaDataSyncer != null) {
Expand All @@ -260,7 +259,6 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P
LOGGER.warn("remove index dir failed, table name: [{}], error: [{}]", tableName, e);
} finally {
metaDataSyncer.deleteIndexLock(tableName);
indexLock.unlock();
LOGGER.debug("release lock after deleting index, table name :[{}]", tableName);
}
});
Expand All @@ -269,20 +267,17 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P
/**
* 异步删除减少shard时runtimedata内的数据信息
*/
public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String tableName, String partitionId, Path shardDir) {
public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, ShardId shardId, String partitionId, Path shardDir) {
threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> {
ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName);
indexLock.lock();
LOGGER.debug("get lock while deleting shard, table name :[{}], partitionId:[{}]", tableName, partitionId);
String tableName = shardId.getIndexName();
try {
if (metaDataSyncer != null) {
metaDataSyncer.setSearcherPendingSync();
try {
checkShardIsDeletedInSearcher(metaDataSyncer, tableName, partitionId);
} catch (IOException e) {
LOGGER.error(
"checkShardIsDeletedInSearcher failed while deleting shard, "
+ "table name: [{}], partitionId:[{}], error: [{}]",
"checkShardIsDeletedInSearcher failed while deleting shard, " + "index: [{}], partitionId:[{}], error: [{}]",
tableName,
partitionId,
e
Expand All @@ -295,9 +290,8 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table
} catch (Exception e) {
LOGGER.warn("remove shard dir failed, table name: [{}],partitionId:[{}], error: [{}]", tableName, partitionId, e);
} finally {
metaDataSyncer.deleteIndexLock(tableName);
indexLock.unlock();
LOGGER.debug("release lock after deleting index, table name :[{}], partitionId[{}]", tableName, partitionId);
metaDataSyncer.deleteShardLock(shardId);
LOGGER.debug("release lock after deleting shard, table name :[{}], partitionId[{}]", tableName, partitionId);
}
});
}
Expand All @@ -320,7 +314,8 @@ private void checkIndexIsDeletedInSearcher(MetaDataSyncer metaDataSyncer, String
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException ex) {
// pass
LOGGER.debug("check havenask table status interrupted while deleting index");
throw new IOException("check havenask table status interrupted while deleting index");
}
}

Expand Down Expand Up @@ -372,7 +367,8 @@ private void checkShardIsDeletedInSearcher(MetaDataSyncer metaDataSyncer, String
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException ex) {
// pass
LOGGER.debug("check havenask table status interrupted while deleting index");
throw new IOException("check havenask table status interrupted while deleting index");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.util.ConcurrentHashSet;
import org.havenask.cluster.ClusterChangedEvent;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.ClusterStateApplier;
Expand All @@ -67,6 +65,7 @@
import org.havenask.engine.util.RangeUtil;
import org.havenask.engine.util.Utils;
import org.havenask.index.Index;
import org.havenask.index.shard.ShardId;
import org.havenask.threadpool.ThreadPool;

import com.carrotsearch.hppc.cursors.ObjectCursor;
Expand Down Expand Up @@ -125,8 +124,8 @@ public class MetaDataSyncer extends AbstractLifecycleComponent implements Cluste
private AtomicReference<TargetInfo> searcherTargetInfo = new AtomicReference<>();
private int syncTimes = 0;
private int qrsSyncTimes = 0;

private ConcurrentMap<String, ReentrantLock> indexLockMap = new ConcurrentHashMap<>();
private ConcurrentHashSet<String> indexLockSet = new ConcurrentHashSet<>();
private ConcurrentHashSet<String> shardLockSet = new ConcurrentHashSet<>();

public MetaDataSyncer(
ClusterService clusterService,
Expand Down Expand Up @@ -163,19 +162,31 @@ public ThreadPool getThreadPool() {
return this.threadPool;
}

public ReentrantLock getIndexLockAndCreateIfNotExist(String tableName) {
if (indexLockMap.containsKey(tableName) == false) {
indexLockMap.put(tableName, new ReentrantLock());
}
return indexLockMap.get(tableName);
public boolean addIndexLock(String tableName) {
return indexLockSet.add(tableName);
}

public boolean getIndexLock(String tableName) {
return indexLockSet.contains(tableName);
}

public boolean deleteIndexLock(String tableName) {
return indexLockSet.remove(tableName);
}

public boolean addShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.add(tableWithShardId);
}

public ReentrantLock getIndexLock(String tableName) {
return indexLockMap.get(tableName);
public boolean getShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.contains(tableWithShardId);
}

public void deleteIndexLock(String tableName) {
indexLockMap.remove(tableName);
public boolean deleteShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.remove(tableWithShardId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

package org.havenask.engine.index;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.HavenaskException;
Expand All @@ -41,17 +38,9 @@ public HavenaskIndexEventListener(HavenaskEngineEnvironment env, MetaDataSyncer
}

@Override
public void afterIndexCreated(IndexService indexService) {
public void afterIndexMappingUpdate(IndexService indexService) {
String tableName = indexService.index().getName();
ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName);
try {
if (indexLock != null) {
if (indexLock.tryLock(60, TimeUnit.SECONDS)) {
LOGGER.debug("get lock while creating index, table name :[{}]", tableName);
} else {
LOGGER.debug("failed to get lock while creating index, out of time, table name :[{}]", tableName);
}
}
BizConfigGenerator.generateBiz(
tableName,
indexService.getIndexSettings().getSettings(),
Expand All @@ -66,34 +55,14 @@ public void afterIndexCreated(IndexService indexService) {
);
} catch (Exception e) {
throw new HavenaskException("generate havenask config error : ", e);
} finally {
if (indexLock != null) {
try {
indexLock.unlock();
LOGGER.debug("release lock after creating index, table name :[{}]", tableName);
} catch (IllegalMonitorStateException e) {
LOGGER.error("release lock error after creating index", e);
}
}
}
}

@Override
public void afterIndexShardCreated(IndexShard indexShard) {
String tableName = indexShard.shardId().getIndexName();
ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName);
checkIndexIsDeleted(indexShard);

try {
if (indexLock != null) {
if (indexLock.tryLock(60, TimeUnit.SECONDS)) {
LOGGER.debug("get lock while creating shard, index name: [{}], shardId :[{}]", tableName, indexShard.shardId().getId());
} else {
LOGGER.debug(
"failed to get lock while creating shard, out of time, index name: [{}], shardId :[{}]",
tableName,
indexShard.shardId().getId()
);
}
}
// 初始化segment信息
RuntimeSegmentGenerator.generateRuntimeSegment(
indexShard.shardId(),
Expand All @@ -104,19 +73,29 @@ public void afterIndexShardCreated(IndexShard indexShard) {
);
} catch (Exception e) {
throw new HavenaskException("generate havenask config error", e);
} finally {
if (indexLock != null) {
try {
indexLock.unlock();
LOGGER.debug(
"release lock after creating shard, table name :[{}], shardId :[{}]",
tableName,
indexShard.shardId().getId()
);
} catch (IllegalMonitorStateException e) {
LOGGER.error("release lock error after creating shard", e);
}
}

private void checkIndexIsDeleted(IndexShard indexShard) {
int loopCount = 60;
int sleepTime = 1000;

String tableName = indexShard.shardId().getIndexName();
try {
while (loopCount > 0) {
boolean indexLockExist = metaDataSyncer.getIndexLock(tableName);
boolean ShardLockExist = metaDataSyncer.getShardLock(indexShard.shardId());
if (indexLockExist == false && ShardLockExist == false) {
break;
}
Thread.sleep(sleepTime);
loopCount--;
}
if (loopCount == 0) {
LOGGER.error("checkIndexIsDeleted out of time while create shard");
}
} catch (InterruptedException e) {
LOGGER.error("checkIndexIsDeleted error while create shard", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private void checkTableStatus() throws IOException {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException ex) {
// pass
throw new IOException("shard [" + engineConfig.getShardId() + "] check havenask table status interrupted", ex);
}
}
}
Expand Down Expand Up @@ -567,16 +567,21 @@ static WriteResponse retryWrite(ShardId shardId, SearcherClient searcherClient,
try {
Thread.sleep(timeValue.millis());
} catch (InterruptedException e) {
// pass
LOGGER.info(
"[{}] havenask write retry interrupted, retry count: {}, cost: {} ms",
shardId,
retryCount,
System.currentTimeMillis() - start
);
return writeResponse;
}
writeResponse = searcherClient.write(writeRequest);
retryCount++;
if (false == isWriteRetry(writeResponse)) {
break;
}
}
long cost = System.currentTimeMillis() - start;
LOGGER.info("[{}] havenask write retry, retry count: {}, cost: {} ms", shardId, retryCount, cost);
LOGGER.info("[{}] havenask write retry, retry count: {}, cost: {} ms", shardId, retryCount, System.currentTimeMillis() - start);
}

return writeResponse;
Expand Down
Loading

0 comments on commit bc69624

Please sign in to comment.