From 291d4f2886d75f7f68cb8bff4b85ed482dddd76c Mon Sep 17 00:00:00 2001 From: Huaixinww <141887897+Huaixinww@users.noreply.github.com> Date: Wed, 13 Dec 2023 15:53:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=8D=E5=A4=8D=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E5=88=A0=E9=99=A4=E7=9B=B8=E5=90=8C=E7=B4=A2=E5=BC=95?= =?UTF-8?q?=E6=97=B6=E6=B2=A1=E6=9C=89=E6=AD=A3=E7=A1=AE=E5=8A=A0=E9=94=81?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98=20(#357)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/HavenaskEngineEnvironment.java | 26 +++---- .../org/havenask/engine/MetaDataSyncer.java | 39 +++++++---- .../index/HavenaskIndexEventListener.java | 67 +++++++------------ .../HavenaskEngineEnvironmentTests.java | 10 +-- 4 files changed, 64 insertions(+), 78 deletions(-) diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java index e4343faa..e948e748 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEngineEnvironment.java @@ -42,7 +42,6 @@ import java.nio.file.StandardOpenOption; import java.util.Locale; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.logging.log4j.LogManager; @@ -212,6 +211,8 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin if (metaDataSyncer == null) { throw new RuntimeException("metaDataSyncer is null while deleting index"); } + metaDataSyncer.addIndexLock(tableName); + LOGGER.debug("get lock while deleting index, table name :[{}]", tableName); // TODO: ThreadPool的获取是否可以优化 final ThreadPool threadPool = metaDataSyncer.getThreadPool(); asyncRemoveIndexDir(threadPool, tableName, indexDir); @@ -225,11 +226,11 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet if (metaDataSyncer == null) { throw new RuntimeException("metaDataSyncer is null while deleting shard"); } - - String tableName = indexSettings.getIndex().getName(); + metaDataSyncer.addShardLock(lock.getShardId()); + LOGGER.debug("get lock while deleting shard, table name :[{}]", lock.getShardId().getIndexName()); String partitionId = RangeUtil.getRangeName(indexSettings.getNumberOfShards(), lock.getShardId().id()); final ThreadPool threadPool = metaDataSyncer.getThreadPool(); - asyncRemoveShardRuntimeDir(threadPool, tableName, partitionId, shardDir); + asyncRemoveShardRuntimeDir(threadPool, lock.getShardId(), partitionId, shardDir); } /** @@ -237,8 +238,6 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet */ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, Path indexDir) { threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> { - ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName); - indexLock.lock(); LOGGER.debug("get lock while deleting index, table name :[{}]", tableName); try { if (metaDataSyncer != null) { @@ -260,7 +259,6 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P LOGGER.warn("remove index dir failed, table name: [{}], error: [{}]", tableName, e); } finally { metaDataSyncer.deleteIndexLock(tableName); - indexLock.unlock(); LOGGER.debug("release lock after deleting index, table name :[{}]", tableName); } }); @@ -269,11 +267,9 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P /** * 异步删除减少shard时runtimedata内的数据信息 */ - public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String tableName, String partitionId, Path shardDir) { + public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, ShardId shardId, String partitionId, Path shardDir) { threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> { - ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName); - indexLock.lock(); - LOGGER.debug("get lock while deleting shard, table name :[{}], partitionId:[{}]", tableName, partitionId); + String tableName = shardId.getIndexName(); try { if (metaDataSyncer != null) { metaDataSyncer.setSearcherPendingSync(); @@ -281,8 +277,7 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table checkShardIsDeletedInSearcher(metaDataSyncer, tableName, partitionId); } catch (IOException e) { LOGGER.error( - "checkShardIsDeletedInSearcher failed while deleting shard, " - + "table name: [{}], partitionId:[{}], error: [{}]", + "checkShardIsDeletedInSearcher failed while deleting shard, " + "index: [{}], partitionId:[{}], error: [{}]", tableName, partitionId, e @@ -295,9 +290,8 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table } catch (Exception e) { LOGGER.warn("remove shard dir failed, table name: [{}],partitionId:[{}], error: [{}]", tableName, partitionId, e); } finally { - metaDataSyncer.deleteIndexLock(tableName); - indexLock.unlock(); - LOGGER.debug("release lock after deleting index, table name :[{}], partitionId[{}]", tableName, partitionId); + metaDataSyncer.deleteShardLock(shardId); + LOGGER.debug("release lock after deleting shard, table name :[{}], partitionId[{}]", tableName, partitionId); } }); } diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java index e6ee61d3..824ccd8d 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java @@ -31,16 +31,14 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.mina.util.ConcurrentHashSet; import org.havenask.cluster.ClusterChangedEvent; import org.havenask.cluster.ClusterState; import org.havenask.cluster.ClusterStateApplier; @@ -67,6 +65,7 @@ import org.havenask.engine.util.RangeUtil; import org.havenask.engine.util.Utils; import org.havenask.index.Index; +import org.havenask.index.shard.ShardId; import org.havenask.threadpool.ThreadPool; import com.carrotsearch.hppc.cursors.ObjectCursor; @@ -125,8 +124,8 @@ public class MetaDataSyncer extends AbstractLifecycleComponent implements Cluste private AtomicReference searcherTargetInfo = new AtomicReference<>(); private int syncTimes = 0; private int qrsSyncTimes = 0; - - private ConcurrentMap indexLockMap = new ConcurrentHashMap<>(); + private ConcurrentHashSet indexLockSet = new ConcurrentHashSet<>(); + private ConcurrentHashSet shardLockSet = new ConcurrentHashSet<>(); public MetaDataSyncer( ClusterService clusterService, @@ -163,19 +162,31 @@ public ThreadPool getThreadPool() { return this.threadPool; } - public ReentrantLock getIndexLockAndCreateIfNotExist(String tableName) { - if (indexLockMap.containsKey(tableName) == false) { - indexLockMap.put(tableName, new ReentrantLock()); - } - return indexLockMap.get(tableName); + public boolean addIndexLock(String tableName) { + return indexLockSet.add(tableName); + } + + public boolean getIndexLock(String tableName) { + return indexLockSet.contains(tableName); + } + + public boolean deleteIndexLock(String tableName) { + return indexLockSet.remove(tableName); + } + + public boolean addShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.add(tableWithShardId); } - public ReentrantLock getIndexLock(String tableName) { - return indexLockMap.get(tableName); + public boolean getShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.contains(tableWithShardId); } - public void deleteIndexLock(String tableName) { - indexLockMap.remove(tableName); + public boolean deleteShardLock(ShardId shardId) { + String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId(); + return shardLockSet.remove(tableWithShardId); } @Override diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java index 18882a5a..0fd19b51 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/HavenaskIndexEventListener.java @@ -14,9 +14,6 @@ package org.havenask.engine.index; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.havenask.HavenaskException; @@ -43,15 +40,7 @@ public HavenaskIndexEventListener(HavenaskEngineEnvironment env, MetaDataSyncer @Override public void afterIndexMappingUpdate(IndexService indexService) { String tableName = indexService.index().getName(); - ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName); try { - if (indexLock != null) { - if (indexLock.tryLock(60, TimeUnit.SECONDS)) { - LOGGER.debug("get lock while creating index, table name :[{}]", tableName); - } else { - LOGGER.debug("failed to get lock while creating index, out of time, table name :[{}]", tableName); - } - } BizConfigGenerator.generateBiz( tableName, indexService.getIndexSettings().getSettings(), @@ -66,34 +55,14 @@ public void afterIndexMappingUpdate(IndexService indexService) { ); } catch (Exception e) { throw new HavenaskException("generate havenask config error : ", e); - } finally { - if (indexLock != null) { - try { - indexLock.unlock(); - LOGGER.debug("release lock after creating index, table name :[{}]", tableName); - } catch (IllegalMonitorStateException e) { - LOGGER.error("release lock error after creating index", e); - } - } } } @Override public void afterIndexShardCreated(IndexShard indexShard) { - String tableName = indexShard.shardId().getIndexName(); - ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName); + checkIndexIsDeleted(indexShard); + try { - if (indexLock != null) { - if (indexLock.tryLock(60, TimeUnit.SECONDS)) { - LOGGER.debug("get lock while creating shard, index name: [{}], shardId :[{}]", tableName, indexShard.shardId().getId()); - } else { - LOGGER.debug( - "failed to get lock while creating shard, out of time, index name: [{}], shardId :[{}]", - tableName, - indexShard.shardId().getId() - ); - } - } // 初始化segment信息 RuntimeSegmentGenerator.generateRuntimeSegment( indexShard.shardId(), @@ -104,19 +73,29 @@ public void afterIndexShardCreated(IndexShard indexShard) { ); } catch (Exception e) { throw new HavenaskException("generate havenask config error", e); - } finally { - if (indexLock != null) { - try { - indexLock.unlock(); - LOGGER.debug( - "release lock after creating shard, table name :[{}], shardId :[{}]", - tableName, - indexShard.shardId().getId() - ); - } catch (IllegalMonitorStateException e) { - LOGGER.error("release lock error after creating shard", e); + } + } + + private void checkIndexIsDeleted(IndexShard indexShard) { + int loopCount = 60; + int sleepTime = 1000; + + String tableName = indexShard.shardId().getIndexName(); + try { + while (loopCount > 0) { + boolean indexLockExist = metaDataSyncer.getIndexLock(tableName); + boolean ShardLockExist = metaDataSyncer.getShardLock(indexShard.shardId()); + if (indexLockExist == false && ShardLockExist == false) { + break; } + Thread.sleep(sleepTime); + loopCount--; + } + if (loopCount == 0) { + LOGGER.error("checkIndexIsDeleted out of time while create shard"); } + } catch (InterruptedException e) { + LOGGER.error("checkIndexIsDeleted error while create shard", e); } } } diff --git a/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java b/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java index 47481345..7264d35d 100644 --- a/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java +++ b/elastic-fed/modules/havenask-engine/src/test/java/org/havenask/engine/HavenaskEngineEnvironmentTests.java @@ -121,8 +121,9 @@ public void testDeleteIndexDirectoryUnderLock() throws IOException { targetInfo.table_info = new HashMap<>(); when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo); - when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock); - when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock); + when(metaDataSyncer.addIndexLock(tableName)).thenReturn(true); + when(metaDataSyncer.getIndexLock(tableName)).thenReturn(true); + when(metaDataSyncer.deleteIndexLock(tableName)).thenReturn(true); havenaskEngineEnvironment.setMetaDataSyncer(metaDataSyncer); @@ -155,8 +156,9 @@ public void testDeleteShardDirectoryUnderLock() throws IOException { targetInfo.table_info = new HashMap<>(); when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo); - when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock); - when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock); + when(metaDataSyncer.addShardLock(shardId)).thenReturn(true); + when(metaDataSyncer.deleteShardLock(shardId)).thenReturn(true); + when(metaDataSyncer.getShardLock(shardId)).thenReturn(false); IndexSettings indexSettings = new IndexSettings(build, Settings.EMPTY);