Skip to content

Commit

Permalink
修复反复创建删除相同索引时没有正确加锁的问题 (alibaba#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaixinww authored Dec 13, 2023
1 parent 8cf4b35 commit 291d4f2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.nio.file.StandardOpenOption;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -212,6 +211,8 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
if (metaDataSyncer == null) {
throw new RuntimeException("metaDataSyncer is null while deleting index");
}
metaDataSyncer.addIndexLock(tableName);
LOGGER.debug("get lock while deleting index, table name :[{}]", tableName);
// TODO: ThreadPool的获取是否可以优化
final ThreadPool threadPool = metaDataSyncer.getThreadPool();
asyncRemoveIndexDir(threadPool, tableName, indexDir);
Expand All @@ -225,20 +226,18 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
if (metaDataSyncer == null) {
throw new RuntimeException("metaDataSyncer is null while deleting shard");
}

String tableName = indexSettings.getIndex().getName();
metaDataSyncer.addShardLock(lock.getShardId());
LOGGER.debug("get lock while deleting shard, table name :[{}]", lock.getShardId().getIndexName());
String partitionId = RangeUtil.getRangeName(indexSettings.getNumberOfShards(), lock.getShardId().id());
final ThreadPool threadPool = metaDataSyncer.getThreadPool();
asyncRemoveShardRuntimeDir(threadPool, tableName, partitionId, shardDir);
asyncRemoveShardRuntimeDir(threadPool, lock.getShardId(), partitionId, shardDir);
}

/**
* 异步移除删除索引后runtimedata的数据信息
*/
public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, Path indexDir) {
threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> {
ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName);
indexLock.lock();
LOGGER.debug("get lock while deleting index, table name :[{}]", tableName);
try {
if (metaDataSyncer != null) {
Expand All @@ -260,7 +259,6 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P
LOGGER.warn("remove index dir failed, table name: [{}], error: [{}]", tableName, e);
} finally {
metaDataSyncer.deleteIndexLock(tableName);
indexLock.unlock();
LOGGER.debug("release lock after deleting index, table name :[{}]", tableName);
}
});
Expand All @@ -269,20 +267,17 @@ public void asyncRemoveIndexDir(final ThreadPool threadPool, String tableName, P
/**
* 异步删除减少shard时runtimedata内的数据信息
*/
public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String tableName, String partitionId, Path shardDir) {
public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, ShardId shardId, String partitionId, Path shardDir) {
threadPool.executor(HavenaskEnginePlugin.HAVENASK_THREAD_POOL_NAME).execute(() -> {
ReentrantLock indexLock = metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName);
indexLock.lock();
LOGGER.debug("get lock while deleting shard, table name :[{}], partitionId:[{}]", tableName, partitionId);
String tableName = shardId.getIndexName();
try {
if (metaDataSyncer != null) {
metaDataSyncer.setSearcherPendingSync();
try {
checkShardIsDeletedInSearcher(metaDataSyncer, tableName, partitionId);
} catch (IOException e) {
LOGGER.error(
"checkShardIsDeletedInSearcher failed while deleting shard, "
+ "table name: [{}], partitionId:[{}], error: [{}]",
"checkShardIsDeletedInSearcher failed while deleting shard, " + "index: [{}], partitionId:[{}], error: [{}]",
tableName,
partitionId,
e
Expand All @@ -295,9 +290,8 @@ public void asyncRemoveShardRuntimeDir(final ThreadPool threadPool, String table
} catch (Exception e) {
LOGGER.warn("remove shard dir failed, table name: [{}],partitionId:[{}], error: [{}]", tableName, partitionId, e);
} finally {
metaDataSyncer.deleteIndexLock(tableName);
indexLock.unlock();
LOGGER.debug("release lock after deleting index, table name :[{}], partitionId[{}]", tableName, partitionId);
metaDataSyncer.deleteShardLock(shardId);
LOGGER.debug("release lock after deleting shard, table name :[{}], partitionId[{}]", tableName, partitionId);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.util.ConcurrentHashSet;
import org.havenask.cluster.ClusterChangedEvent;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.ClusterStateApplier;
Expand All @@ -67,6 +65,7 @@
import org.havenask.engine.util.RangeUtil;
import org.havenask.engine.util.Utils;
import org.havenask.index.Index;
import org.havenask.index.shard.ShardId;
import org.havenask.threadpool.ThreadPool;

import com.carrotsearch.hppc.cursors.ObjectCursor;
Expand Down Expand Up @@ -125,8 +124,8 @@ public class MetaDataSyncer extends AbstractLifecycleComponent implements Cluste
private AtomicReference<TargetInfo> searcherTargetInfo = new AtomicReference<>();
private int syncTimes = 0;
private int qrsSyncTimes = 0;

private ConcurrentMap<String, ReentrantLock> indexLockMap = new ConcurrentHashMap<>();
private ConcurrentHashSet<String> indexLockSet = new ConcurrentHashSet<>();
private ConcurrentHashSet<String> shardLockSet = new ConcurrentHashSet<>();

public MetaDataSyncer(
ClusterService clusterService,
Expand Down Expand Up @@ -163,19 +162,31 @@ public ThreadPool getThreadPool() {
return this.threadPool;
}

public ReentrantLock getIndexLockAndCreateIfNotExist(String tableName) {
if (indexLockMap.containsKey(tableName) == false) {
indexLockMap.put(tableName, new ReentrantLock());
}
return indexLockMap.get(tableName);
public boolean addIndexLock(String tableName) {
return indexLockSet.add(tableName);
}

public boolean getIndexLock(String tableName) {
return indexLockSet.contains(tableName);
}

public boolean deleteIndexLock(String tableName) {
return indexLockSet.remove(tableName);
}

public boolean addShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.add(tableWithShardId);
}

public ReentrantLock getIndexLock(String tableName) {
return indexLockMap.get(tableName);
public boolean getShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.contains(tableWithShardId);
}

public void deleteIndexLock(String tableName) {
indexLockMap.remove(tableName);
public boolean deleteShardLock(ShardId shardId) {
String tableWithShardId = shardId.getIndexName() + "_" + shardId.getId();
return shardLockSet.remove(tableWithShardId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

package org.havenask.engine.index;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.HavenaskException;
Expand All @@ -43,15 +40,7 @@ public HavenaskIndexEventListener(HavenaskEngineEnvironment env, MetaDataSyncer
@Override
public void afterIndexMappingUpdate(IndexService indexService) {
String tableName = indexService.index().getName();
ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName);
try {
if (indexLock != null) {
if (indexLock.tryLock(60, TimeUnit.SECONDS)) {
LOGGER.debug("get lock while creating index, table name :[{}]", tableName);
} else {
LOGGER.debug("failed to get lock while creating index, out of time, table name :[{}]", tableName);
}
}
BizConfigGenerator.generateBiz(
tableName,
indexService.getIndexSettings().getSettings(),
Expand All @@ -66,34 +55,14 @@ public void afterIndexMappingUpdate(IndexService indexService) {
);
} catch (Exception e) {
throw new HavenaskException("generate havenask config error : ", e);
} finally {
if (indexLock != null) {
try {
indexLock.unlock();
LOGGER.debug("release lock after creating index, table name :[{}]", tableName);
} catch (IllegalMonitorStateException e) {
LOGGER.error("release lock error after creating index", e);
}
}
}
}

@Override
public void afterIndexShardCreated(IndexShard indexShard) {
String tableName = indexShard.shardId().getIndexName();
ReentrantLock indexLock = metaDataSyncer.getIndexLock(tableName);
checkIndexIsDeleted(indexShard);

try {
if (indexLock != null) {
if (indexLock.tryLock(60, TimeUnit.SECONDS)) {
LOGGER.debug("get lock while creating shard, index name: [{}], shardId :[{}]", tableName, indexShard.shardId().getId());
} else {
LOGGER.debug(
"failed to get lock while creating shard, out of time, index name: [{}], shardId :[{}]",
tableName,
indexShard.shardId().getId()
);
}
}
// 初始化segment信息
RuntimeSegmentGenerator.generateRuntimeSegment(
indexShard.shardId(),
Expand All @@ -104,19 +73,29 @@ public void afterIndexShardCreated(IndexShard indexShard) {
);
} catch (Exception e) {
throw new HavenaskException("generate havenask config error", e);
} finally {
if (indexLock != null) {
try {
indexLock.unlock();
LOGGER.debug(
"release lock after creating shard, table name :[{}], shardId :[{}]",
tableName,
indexShard.shardId().getId()
);
} catch (IllegalMonitorStateException e) {
LOGGER.error("release lock error after creating shard", e);
}
}

private void checkIndexIsDeleted(IndexShard indexShard) {
int loopCount = 60;
int sleepTime = 1000;

String tableName = indexShard.shardId().getIndexName();
try {
while (loopCount > 0) {
boolean indexLockExist = metaDataSyncer.getIndexLock(tableName);
boolean ShardLockExist = metaDataSyncer.getShardLock(indexShard.shardId());
if (indexLockExist == false && ShardLockExist == false) {
break;
}
Thread.sleep(sleepTime);
loopCount--;
}
if (loopCount == 0) {
LOGGER.error("checkIndexIsDeleted out of time while create shard");
}
} catch (InterruptedException e) {
LOGGER.error("checkIndexIsDeleted error while create shard", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void testDeleteIndexDirectoryUnderLock() throws IOException {
targetInfo.table_info = new HashMap<>();
when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo);

when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock);
when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock);
when(metaDataSyncer.addIndexLock(tableName)).thenReturn(true);
when(metaDataSyncer.getIndexLock(tableName)).thenReturn(true);
when(metaDataSyncer.deleteIndexLock(tableName)).thenReturn(true);

havenaskEngineEnvironment.setMetaDataSyncer(metaDataSyncer);

Expand Down Expand Up @@ -155,8 +156,9 @@ public void testDeleteShardDirectoryUnderLock() throws IOException {
targetInfo.table_info = new HashMap<>();
when(metaDataSyncer.getSearcherTargetInfo()).thenReturn(targetInfo);

when(metaDataSyncer.getIndexLockAndCreateIfNotExist(tableName)).thenReturn(indexLock);
when(metaDataSyncer.getIndexLock(tableName)).thenReturn(indexLock);
when(metaDataSyncer.addShardLock(shardId)).thenReturn(true);
when(metaDataSyncer.deleteShardLock(shardId)).thenReturn(true);
when(metaDataSyncer.getShardLock(shardId)).thenReturn(false);

IndexSettings indexSettings = new IndexSettings(build, Settings.EMPTY);

Expand Down

0 comments on commit 291d4f2

Please sign in to comment.