Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23102: Improper Usage of Map putIfAbsent #828

Merged
merged 2 commits into from
Nov 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -828,10 +828,12 @@ public void setFailureResultForNonce(NonceKey nonceKey, String procName, User pr
return;
}

Procedure<TEnvironment> proc =
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
completed.computeIfAbsent(procId, (key) -> {
Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
procName, procOwner, nonceKey, exception);

completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
return new CompletedProcedureRetainer<>(proc);
});
}

// ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ public void join() {
*/
public void addNode(final TRemote key) {
assert key != null: "Tried to add a node with a null key";
final BufferNode newNode = new BufferNode(key);
nodeMap.putIfAbsent(key, newNode);
nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,9 @@ private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
ServerName currServer = entry.getValue();
RegionInfo currRegion = entry.getKey();
if (rsGroupInfo.getTables().contains(currTable)) {
assignments.putIfAbsent(currTable, new HashMap<>());
assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
assignments.get(currTable).get(currServer).add(currRegion);
assignments.computeIfAbsent(currTable, key -> new HashMap<>())
.computeIfAbsent(currServer, key -> new ArrayList<>())
.add(currRegion);
}
}

Expand All @@ -595,12 +595,16 @@ private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
}

// add all tables that are members of the group
for(TableName tableName : rsGroupInfo.getTables()) {
if(assignments.containsKey(tableName)) {
result.put(tableName, new HashMap<>());
result.get(tableName).putAll(serverMap);
result.get(tableName).putAll(assignments.get(tableName));
LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
for (TableName tableName : rsGroupInfo.getTables()) {
if (assignments.containsKey(tableName)) {
Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap);

Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName);
tableResults.putAll(tableAssignments);

result.put(tableName, tableResults);

LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class ExecutorService {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);

// hold the all the executors created in a map addressable by their names
private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();

// Name of the server hosting this executor service.
private final String servername;
Expand All @@ -87,18 +87,18 @@ public ExecutorService(final String servername) {
*/
@VisibleForTesting
public void startExecutorService(String name, int maxThreads) {
if (this.executorMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running!");
}
Executor hbes = new Executor(name, maxThreads);
if (this.executorMap.putIfAbsent(name, hbes) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running (2)!");
}
LOG.debug("Starting executor service name=" + name +
", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) {
throw new RuntimeException("An executor service with the name " + key +
" is already running!");
}
return new Executor(key, maxThreads);
});

LOG.debug(
"Starting executor service name={}, corePoolSize={}, maxPoolSize={}",
name, hbes.threadPoolExecutor.getCorePoolSize(),
hbes.threadPoolExecutor.getMaximumPoolSize());
}

boolean isExecutorServiceRunning(String name) {
Expand Down Expand Up @@ -134,7 +134,8 @@ public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
public void startExecutorService(final ExecutorType type, final int maxThreads) {
String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service " + toString() + " already running on " + this.servername);
LOG.debug("Executor service {} already running on {}", this,
this.servername);
return;
}
startExecutorService(name, maxThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,8 @@ private void prepareTableToReopenRegionsMap(
}
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
tableToReopenRegionsMap.get(tableName).add(regionName);

tableToReopenRegionsMap
.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
}

// hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ public boolean isRegionInRegionStates(final RegionInfo hri) {
// ==========================================================================
@VisibleForTesting
RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition);
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
return oldNode != null ? oldNode : newNode;
return regionsMap.computeIfAbsent(regionInfo.getRegionName(),
key -> new RegionStateNode(regionInfo, regionInTransition));
}

public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
Expand Down Expand Up @@ -556,7 +555,7 @@ public Map<TableName, Map<ServerName, List<RegionInfo>>> getAssignmentsForBalanc
// Add online servers with no assignment for the table.
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
for (ServerName serverName : serverMap.keySet()) {
table.putIfAbsent(serverName, new ArrayList<>());
table.computeIfAbsent(serverName, key -> new ArrayList<>());
}
}
} else {
Expand Down Expand Up @@ -677,13 +676,7 @@ public Exception getException() {

public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
final byte[] key = regionNode.getRegionInfo().getRegionName();
RegionFailedOpen node = regionFailedOpen.get(key);
if (node == null) {
RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
node = oldNode != null ? oldNode : newNode;
}
return node;
return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode));
}

public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
Expand Down Expand Up @@ -714,13 +707,7 @@ public List<RegionState> getRegionFailedOpen() {
* to {@link #getServerNode(ServerName)} where we can.
*/
public ServerStateNode getOrCreateServer(final ServerName serverName) {
ServerStateNode node = serverMap.get(serverName);
if (node == null) {
node = new ServerStateNode(serverName);
ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
node = oldNode != null ? oldNode : node;
}
return node;
return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
}

public void removeServer(final ServerName serverName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -36,7 +37,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
new FileArchiverNotifierFactoryImpl();
private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
private final ConcurrentHashMap<TableName,FileArchiverNotifier> CACHE;
private final ConcurrentMap<TableName,FileArchiverNotifier> CACHE;

private FileArchiverNotifierFactoryImpl() {
CACHE = new ConcurrentHashMap<>();
Expand All @@ -62,15 +63,10 @@ static void reset() {
* @param tn The table to obtain a notifier for
* @return The notifier for the given {@code tablename}.
*/
public FileArchiverNotifier get(
Connection conn, Configuration conf, FileSystem fs, TableName tn) {
public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs,
TableName tn) {
// Ensure that only one instance is exposed to callers
final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
if (previousMapping == null) {
return newMapping;
}
return previousMapping;
return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key));
}

public int getCacheSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
Expand Down Expand Up @@ -69,10 +70,10 @@ public class QuotaCache implements Stoppable {
// for testing purpose only, enforce the cache to be always refreshed
static boolean TEST_FORCE_REFRESH = false;

private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
new ConcurrentHashMap<>();
private volatile boolean exceedThrottleQuotaEnabled = false;
// factors used to divide cluster scope quota into machine scope quota
Expand Down Expand Up @@ -174,7 +175,7 @@ protected boolean isExceedThrottleQuotaEnabled() {
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
* returned and the quota request will be enqueued for the next cache refresh.
*/
private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap,
final K key) {
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
}
Expand Down Expand Up @@ -223,17 +224,18 @@ public QuotaRefresherChore(final int period, final Stoppable stoppable) {
protected void chore() {
// Prefetch online tables/namespaces
for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
if (table.isSystemTable()) continue;
if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
}
String ns = table.getNamespaceAsString();
if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
if (table.isSystemTable()) {
continue;
}
QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());

final String ns = table.getNamespaceAsString();

QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
}
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
new QuotaState());

QuotaCache.this.regionServerQuotaCache.computeIfAbsent(
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());

updateQuotaFactors();
fetchNamespaceQuotaState();
Expand Down Expand Up @@ -319,7 +321,7 @@ private void fetchExceedThrottleQuota() {
}

private <K, V extends QuotaState> void fetch(final String type,
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
long now = EnvironmentEdgeManager.currentTime();
long refreshPeriod = getPeriod();
long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class StoreHotnessProtector {
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;

private final Map<byte[], AtomicInteger> preparePutToStoreMap =
private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
private final Region region;

Expand All @@ -101,7 +102,7 @@ public void init(Configuration conf) {
public void update(Configuration conf) {
init(conf);
preparePutToStoreMap.clear();
LOG.debug("update config: " + toString());
LOG.debug("update config: {}", this);
}

public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
Expand All @@ -121,13 +122,9 @@ public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyExcept

//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
//cleared when changing the configuration.
preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
if (preparePutCounter == null) {
preparePutCounter = new AtomicInteger();
preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
}
int preparePutCount = preparePutCounter.incrementAndGet();
int preparePutCount = preparePutToStoreMap
.computeIfAbsent(e.getKey(), key -> new AtomicInteger())
.incrementAndGet();
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
tooBusyStore = (tooBusyStore == null ?
Expand All @@ -146,9 +143,7 @@ public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyExcept
String msg =
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(msg);
}
LOG.trace(msg);
throw new RegionTooBusyException(msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
}

private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
walGroupId);
}
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"{} Someone has beat us to start a worker thread for wal group {}",
logPeerId(), key);
}
return value;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
}
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::uncaughtException);
worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
return worker;
}
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
}
});
}

@Override
Expand Down