Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-23102: Improper Usage of Map putIfAbsent (#828)" to branch-2 #5048

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,12 @@ public void setFailureResultForNonce(NonceKey nonceKey, String procName, User pr
return;
}

Procedure<TEnvironment> proc =
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
completed.computeIfAbsent(procId, (key) -> {
Procedure<TEnvironment> proc =
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);

completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
return new CompletedProcedureRetainer<>(proc);
});
}

// ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public void join() {
*/
public void addNode(final TRemote key) {
assert key != null : "Tried to add a node with a null key";
final BufferNode newNode = new BufferNode(key);
nodeMap.putIfAbsent(key, newNode);
nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,8 @@ Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable(
if (currRegion.isSplitParent()) {
continue;
}
assignments.putIfAbsent(currTable, new HashMap<>());
assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
assignments.get(currTable).get(currServer).add(currRegion);
assignments.computeIfAbsent(currTable, key -> new HashMap<>())
.computeIfAbsent(currServer, key -> new ArrayList<>()).add(currRegion);
}
}

Expand All @@ -710,10 +709,14 @@ Map<TableName, Map<ServerName, List<RegionInfo>>> getRSGroupAssignmentsByTable(
// add all tables that are members of the group
for (TableName tableName : rsGroupInfo.getTables()) {
if (assignments.containsKey(tableName)) {
result.put(tableName, new HashMap<>());
result.get(tableName).putAll(serverMap);
result.get(tableName).putAll(assignments.get(tableName));
LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap);

Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName);
tableResults.putAll(tableAssignments);

result.put(tableName, tableResults);

LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ExecutorService {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);

// hold the all the executors created in a map addressable by their names
private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();

// Name of the server hosting this executor service.
private final String servername;
Expand All @@ -84,18 +84,16 @@ public ExecutorService(final String servername) {
*/
public void startExecutorService(final ExecutorConfig config) {
final String name = config.getName();
if (this.executorMap.get(name) != null) {
throw new RuntimeException(
"An executor service with the name " + name + " is already running!");
}
Executor hbes = new Executor(config);
if (this.executorMap.putIfAbsent(name, hbes) != null) {
throw new RuntimeException(
"An executor service with the name " + name + " is already running (2)!");
}
LOG.debug("Starting executor service name=" + name + ", corePoolSize="
+ hbes.threadPoolExecutor.getCorePoolSize() + ", maxPoolSize="
+ hbes.threadPoolExecutor.getMaximumPoolSize());
Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) {
throw new RuntimeException(
"An executor service with the name " + key + " is already running!");
}
return new Executor(config);
});

LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name,
hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize());
}

boolean isExecutorServiceRunning(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ private void prepareTableToReopenRegionsMap(
}
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
tableToReopenRegionsMap.get(tableName).add(regionName);

tableToReopenRegionsMap.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ public boolean isRegionInRegionStates(final RegionInfo hri) {
// RegionStateNode helpers
// ==========================================================================
RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition);
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
return oldNode != null ? oldNode : newNode;
return regionsMap.computeIfAbsent(regionInfo.getRegionName(),
key -> new RegionStateNode(regionInfo, regionInTransition));
}

public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
Expand Down Expand Up @@ -583,7 +582,7 @@ public ServerName getRegionServerOfRegion(RegionInfo regionInfo) {
}
// Add online servers with no assignment for the table.
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
for (ServerName serverName : onlineServers) {
for (ServerName serverName : serverMap.keySet()) {
table.computeIfAbsent(serverName, key -> new ArrayList<>());
}
}
Expand Down Expand Up @@ -703,13 +702,7 @@ public Exception getException() {

public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
final byte[] key = regionNode.getRegionInfo().getRegionName();
RegionFailedOpen node = regionFailedOpen.get(key);
if (node == null) {
RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
node = oldNode != null ? oldNode : newNode;
}
return node;
return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode));
}

public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
Expand Down Expand Up @@ -740,13 +733,7 @@ public List<RegionState> getRegionFailedOpen() {
* where we can.
*/
public ServerStateNode getOrCreateServer(final ServerName serverName) {
ServerStateNode node = serverMap.get(serverName);
if (node == null) {
node = new ServerStateNode(serverName);
ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
node = oldNode != null ? oldNode : node;
}
return node;
return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -34,7 +35,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
new FileArchiverNotifierFactoryImpl();
private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
private final ConcurrentHashMap<TableName, FileArchiverNotifier> CACHE;
private final ConcurrentMap<TableName, FileArchiverNotifier> CACHE;

private FileArchiverNotifierFactoryImpl() {
CACHE = new ConcurrentHashMap<>();
Expand All @@ -60,12 +61,7 @@ static void reset() {
public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs,
TableName tn) {
// Ensure that only one instance is exposed to callers
final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
if (previousMapping == null) {
return newMapping;
}
return previousMapping;
return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key));
}

public int getCacheSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
Expand Down Expand Up @@ -62,13 +63,10 @@ public class QuotaCache implements Stoppable {
// for testing purpose only, enforce the cache to be always refreshed
static boolean TEST_FORCE_REFRESH = false;

private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
new ConcurrentHashMap<>();
private volatile boolean exceedThrottleQuotaEnabled = false;
// factors used to divide cluster scope quota into machine scope quota
Expand Down Expand Up @@ -166,8 +164,7 @@ protected boolean isExceedThrottleQuotaEnabled() {
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
* returned and the quota request will be enqueued for the next cache refresh.
*/
private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
final K key) {
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
}

Expand Down Expand Up @@ -209,17 +206,18 @@ public QuotaRefresherChore(final int period, final Stoppable stoppable) {
protected void chore() {
// Prefetch online tables/namespaces
for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
if (table.isSystemTable()) continue;
if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
}
String ns = table.getNamespaceAsString();
if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
if (table.isSystemTable()) {
continue;
}
QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());

final String ns = table.getNamespaceAsString();

QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
}
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
new QuotaState());

QuotaCache.this.regionServerQuotaCache
.computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());

updateQuotaFactors();
fetchNamespaceQuotaState();
Expand Down Expand Up @@ -302,7 +300,7 @@ private void fetchExceedThrottleQuota() {
}

private <K, V extends QuotaState> void fetch(final String type,
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
long now = EnvironmentEdgeManager.currentTime();
long refreshPeriod = getPeriod();
long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class StoreHotnessProtector {
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;

private final Map<byte[], AtomicInteger> preparePutToStoreMap =
private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
private final Region region;

Expand Down Expand Up @@ -119,7 +120,7 @@ private static void logDisabledMessageOnce() {
public void update(Configuration conf) {
init(conf);
preparePutToStoreMap.clear();
LOG.debug("update config: " + toString());
LOG.debug("update config: {}", this);
}

public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
Expand Down