diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java index c3d831ac..e948e748 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java @@ -14,6 +14,27 @@ package org.havenask.engine; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.BIZ_DIR; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.CLUSTER_DIR; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.DATA_TABLES_DIR; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_BIZ_CONFIG; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_DIR; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.PLUGINS_DIR; +import static org.havenask.engine.index.config.generator.BizConfigGenerator.SCHEMAS_DIR; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_CONTENT_TEMPLATE; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_NAME; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_CONTENT; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_NAME; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_CONTENT; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_NAME; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_CONTENT; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_NAME; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.SCHEMA_FILE_NAME; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_CONTENT; +import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_NAME; +import static org.havenask.engine.index.config.generator.TableConfigGenerator.TABLE_DIR; +import static org.havenask.env.Environment.PATH_HOME_SETTING; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -21,7 +42,6 @@ import java.nio.file.StandardOpenOption; import java.util.Locale; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.logging.log4j.LogManager; @@ -46,27 +66,6 @@ import org.havenask.plugins.NodeEnvironmentPlugin.CustomEnvironment; import org.havenask.threadpool.ThreadPool; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.BIZ_DIR; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.CLUSTER_DIR; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.DATA_TABLES_DIR; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_BIZ_CONFIG; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.DEFAULT_DIR; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.PLUGINS_DIR; -import static org.havenask.engine.index.config.generator.BizConfigGenerator.SCHEMAS_DIR; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_CONTENT_TEMPLATE; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.DEPLOY_META_FILE_NAME; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_CONTENT; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.ENTRY_TABLE_FILE_NAME; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_CONTENT; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_FORMAT_VERSION_FILE_NAME; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_CONTENT; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.INDEX_PARTITION_META_FILE_NAME; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.SCHEMA_FILE_NAME; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_CONTENT; -import static org.havenask.engine.index.config.generator.RuntimeSegmentGenerator.VERSION_FILE_NAME; -import static org.havenask.engine.index.config.generator.TableConfigGenerator.TABLE_DIR; -import static org.havenask.env.Environment.PATH_HOME_SETTING; - public class HavenaskEngineEnvironment implements CustomEnvironment { private static final Logger LOGGER = LogManager.getLogger(HavenaskEngineEnvironment.class); public static final String DEFAULT_DATA_PATH = "havenask"; @@ -212,6 +211,8 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin if (metaDataSyncer == null) { throw new RuntimeException("metaDataSyncer is null while deleting index"); } + metaDataSyncer.addIndexLock(tableName); + LOGGER.debug("get lock while deleting index, table name :[{}]", tableName); // TODO: ThreadPool的获取是否可以优化 final ThreadPool threadPool = metaDataSyncer.getThreadPool(); asyncRemoveIndexDir(threadPool, tableName, indexDir); @@ -225,11 +226,11 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet if (metaDataSyncer == null) { throw new RuntimeException("metaDataSyncer is null while deleting shard"); } - - String tableName = indexSettings.getIndex().getName(); + metaDataSyncer.addShardLock(lock.getShardId()); + LOGGER.debug("get lock while deleting shard, table name :[{}]", lock.getShardId().getIndexName()); String partitionId = RangeUtil.getRangeName(indexSettings.getNumberOfShards(), lock.getShardId().id()); final ThreadPool threadPool = metaDataSyncer.getThreadPool(); - asyncRemoveShardRuntimeDir(threadPool, tableName, partitionId, shardDir); + asyncRemoveShardRuntimeDir(threadPool, lock.getShardId(), partitionId, shardDir); } /** @@ -237,8 +238,6 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet */ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, Path indexDir) { threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> { - ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName); - indexLock.lock(); LOGGER.debug("get lock while deleting index, table name :[{}]", tableName); try { if (metaDataSyncer != null) { @@ -260,7 +259,6 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P LOGGER.warn("remove index dir failed, table name: [{}], error: [{}]", tableName, e); } finally { metaDataSyncer.deleteIndexLock(tableName); - indexLock.unlock(); LOGGER.debug("release lock after deleting index, table name :[{}]", tableName); } }); @@ -269,11 +267,9 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P /** * 异步删除减少shard时runtimedata内的数据信息 */ - public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String tableName, String partitionId, Path shardDir) { + public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, ShardId shardId, String partitionId, Path shardDir) { threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> { - ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName); - indexLock.lock(); - LOGGER.debug("get lock while deleting shard, table name :[{}], partitionId:[{}]", tableName, partitionId); + String tableName = shardId.getIndexName(); try { if (metaDataSyncer != null) { metaDataSyncer.setSearcherPendingSync(); @@ -281,8 +277,7 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table checkShardIsDeletedInSearcher(metaDataSyncer, tableName, partitionId); } catch (IOException e) { LOGGER.error( - "checkShardIsDeletedInSearcher failed while deleting shard, " - + "table name: [{}], partitionId:[{}], error: [{}]", + "checkShardIsDeletedInSearcher failed while deleting shard, " + "index: [{}], partitionId:[{}], error: [{}]", tableName, partitionId, e @@ -295,9 +290,8 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table } catch (Exception e) { LOGGER.warn("remove shard dir failed, table name: [{}],partitionId:[{}], error: [{}]", tableName, partitionId, e); } finally { - metaDataSyncer.deleteIndexLock(tableName); - indexLock.unlock(); - LOGGER.debug("release lock after deleting index, table name :[{}], partitionId[{}]", tableName, partitionId); + metaDataSyncer.deleteShardLock(shardId); + LOGGER.debug("release lock after deleting shard, table name :[{}], partitionId[{}]", tableName, partitionId); } }); } @@ -320,7 +314,8 @@ private void checkIndexIsDeletedInSearcher(MetaDataSyncer metaDataSyncer, String try { Thread.sleep(sleepInterval); } catch (InterruptedException ex) { - // pass + LOGGER.debug("check havenask table status interrupted while deleting index"); + throw new IOException("check havenask table status interrupted while deleting index"); } } @@ -372,7 +367,8 @@ private void checkShardIsDeletedInSearcher(MetaDataSyncer metaDataSyncer, String try { Thread.sleep(sleepInterval); } catch (InterruptedException ex) { - // pass + LOGGER.debug("check havenask table status interrupted while deleting index"); + throw new IOException("check havenask table status interrupted while deleting index"); } } diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java index e6ee61d3..824ccd8d 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java @@ -31,16 +31,14 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.mina.util.ConcurrentHashSet; import org.havenask.cluster.ClusterChangedEvent; import org.havenask.cluster.ClusterState; import org.havenask.cluster.ClusterStateApplier; @@ -67,6 +65,7 @@ import org.havenask.engine.util.RangeUtil; import org.havenask.engine.util.Utils; import org.havenask.index.Index; +import org.havenask.index.shard.ShardId; import org.havenask.threadpool.ThreadPool; import com.carrotsearch.hppc.cursors.ObjectCursor; @@ -125,8 +124,8 @@ public class MetaDataSyncer extends AbstractLifecycleComponent implements Cluste private AtomicReference searcherTargetInfo = new AtomicReference<>(); private int syncTimes = 0; private int qrsSyncTimes = 0; - - private ConcurrentMap indexLockMap = new ConcurrentHashMap<>(); + private ConcurrentHashSet indexLockSet = new ConcurrentHashSet<>(); + private ConcurrentHashSet shardLockSet = new ConcurrentHashSet<>(); public MetaDataSyncer( ClusterService clusterService, @@ -163,19 +162,31 @@ public ThreadPool getThreadPool() { return this.threadPool; } - public ReentrantLock getIndexLockAndCreateIfNotExist(String tableName) { - if (indexLockMap.containsKey(tableName) == false) { - indexLockMap.put(tableName, new ReentrantLock()); - } - return indexLockMap.get(tableName); + public boolean addIndexLock(String tableName) { + return indexLockSet.add(tableName); + } + + public boolean getIndexLock(String tableName) { + return indexLockSet.contains(tableName); + } + + public boolean deleteIndexLock(String tableName) { + return indexLockSet.remove(tableName); + } + + public boolean addShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.add(tableWithShardId); } - public ReentrantLock getIndexLock(String tableName) { - return indexLockMap.get(tableName); + public boolean getShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.contains(tableWithShardId); } - public void deleteIndexLock(String tableName) { - indexLockMap.remove(tableName); + public boolean deleteShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.remove(tableWithShardId); } @Override diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java index 493de7a1..0fd19b51 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java @@ -14,9 +14,6 @@ package org.havenask.engine.index; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.havenask.HavenaskException; @@ -41,17 +38,9 @@ public HavenaskIndexEventListener(HavenaskEngineEnvironment env, MetaDataSyncer } @Override - public void afterIndexCreated(IndexService indexService) { + public void afterIndexMappingUpdate(IndexService indexService) { String tableName = indexService.index().getName(); - ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName); try { - if (indexLock != null) { - if (indexLock.tryLock(60, TimeUnit.SECONDS)) { - LOGGER.debug("get lock while creating index, table name :[{}]", tableName); - } else { - LOGGER.debug("failed to get lock while creating index, out of time, table name :[{}]", tableName); - } - } BizConfigGenerator.generateBiz( tableName, indexService.getIndexSettings().getSettings(), @@ -66,34 +55,14 @@ public void afterIndexCreated(IndexService indexService) { ); } catch (Exception e) { throw new HavenaskException("generate havenask config error : ", e); - } finally { - if (indexLock != null) { - try { - indexLock.unlock(); - LOGGER.debug("release lock after creating index, table name :[{}]", tableName); - } catch (IllegalMonitorStateException e) { - LOGGER.error("release lock error after creating index", e); - } - } } } @Override public void afterIndexShardCreated(IndexShard indexShard) { - String tableName = indexShard.shardId().getIndexName(); - ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName); + checkIndexIsDeleted(indexShard); + try { - if (indexLock != null) { - if (indexLock.tryLock(60, TimeUnit.SECONDS)) { - LOGGER.debug("get lock while creating shard, index name: [{}], shardId :[{}]", tableName, indexShard.shardId().getId()); - } else { - LOGGER.debug( - "failed to get lock while creating shard, out of time, index name: [{}], shardId :[{}]", - tableName, - indexShard.shardId().getId() - ); - } - } // 初始化segment信息 RuntimeSegmentGenerator.generateRuntimeSegment( indexShard.shardId(), @@ -104,19 +73,29 @@ public void afterIndexShardCreated(IndexShard indexShard) { ); } catch (Exception e) { throw new HavenaskException("generate havenask config error", e); - } finally { - if (indexLock != null) { - try { - indexLock.unlock(); - LOGGER.debug( - "release lock after creating shard, table name :[{}], shardId :[{}]", - tableName, - indexShard.shardId().getId() - ); - } catch (IllegalMonitorStateException e) { - LOGGER.error("release lock error after creating shard", e); + } + } + + private void checkIndexIsDeleted(IndexShard indexShard) { + int loopCount = 60; + int sleepTime = 1000; + + String tableName = indexShard.shardId().getIndexName(); + try { + while (loopCount > 0) { + boolean indexLockExist = metaDataSyncer.getIndexLock(tableName); + boolean ShardLockExist = metaDataSyncer.getShardLock(indexShard.shardId()); + if (indexLockExist == false && ShardLockExist == false) { + break; } + Thread.sleep(sleepTime); + loopCount--; + } + if (loopCount == 0) { + LOGGER.error("checkIndexIsDeleted out of time while create shard"); } + } catch (InterruptedException e) { + LOGGER.error("checkIndexIsDeleted error while create shard", e); } } } diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java index b793810f..9cc9c852 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java @@ -326,7 +326,7 @@ private void checkTableStatus() throws IOException { try { Thread.sleep(sleepInterval); } catch (InterruptedException ex) { - // pass + throw new IOException("shard [" + engineConfig.getShardId() + "] check havenask table status interrupted", ex); } } } @@ -567,7 +567,13 @@ static WriteResponse retryWrite(ShardId shardId, SearcherClient searcherClient, try { Thread.sleep(timeValue.millis()); } catch (InterruptedException e) { - // pass + LOGGER.info( + "[{}] havenask write retry interrupted, retry count: {}, cost: {} ms", + shardId, + retryCount, + System.currentTimeMillis() - start + ); + return writeResponse; } writeResponse = searcherClient.write(writeRequest); retryCount++; @@ -575,8 +581,7 @@ static WriteResponse retryWrite(ShardId shardId, SearcherClient searcherClient, break; } } - long cost = System.currentTimeMillis() - start; - LOGGER.info("[{}] havenask write retry, retry count: {}, cost: {} ms", shardId, retryCount, cost); + LOGGER.info("[{}] havenask write retry, retry count: {}, cost: {} ms", shardId, retryCount, System.currentTimeMillis() - start); } return writeResponse; diff --git a/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java b/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java index 47481345..7264d35d 100644 --- a/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java +++ b/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java @@ -121,8 +121,9 @@ public void testDeleteIndexDirectoryUnderLock() throws IOException { targetInfo.table_info = new HashMap<>(); when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo); - when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock); - when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock); + when(metaDataSyncer.addIndexLock(tableName)).thenReturn(true); + when(metaDataSyncer.getIndexLock(tableName)).thenReturn(true); + when(metaDataSyncer.deleteIndexLock(tableName)).thenReturn(true); havenaskEngineEnvironment.setMetaDataSyncer(metaDataSyncer); @@ -155,8 +156,9 @@ public void testDeleteShardDirectoryUnderLock() throws IOException { targetInfo.table_info = new HashMap<>(); when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo); - when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock); - when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock); + when(metaDataSyncer.addShardLock(shardId)).thenReturn(true); + when(metaDataSyncer.deleteShardLock(shardId)).thenReturn(true); + when(metaDataSyncer.getShardLock(shardId)).thenReturn(false); IndexSettings indexSettings = new IndexSettings(build, Settings.EMPTY);