Skip to content

Commit

Permalink
fix: remove unnecessary synchronized to improve concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Oct 18, 2024
1 parent 01f9848 commit 9754f5b
Showing 1 changed file with 23 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -39,11 +40,11 @@ public class ProducerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
private final ConcurrentMap<String /* group name */, ConcurrentMap<Channel, ClientChannelInfo>> groupChannelTable =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
protected final BrokerStatsManager brokerStatsManager;
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
private final List<ProducerChangeListener> producerChangeListenerList = new CopyOnWriteArrayList<>();

public ProducerManager() {
Expand All @@ -63,7 +64,7 @@ public boolean groupOnline(String group) {
return channels != null && !channels.isEmpty();
}

public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
return groupChannelTable;
}

Expand Down Expand Up @@ -95,13 +96,13 @@ public ProducerTableInfo getProducerTable() {
}

public void scanNotActiveChannel() {
Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
Iterator<Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();

while (iterator.hasNext()) {
Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry = iterator.next();
Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry = iterator.next();

final String group = entry.getKey();
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
final ConcurrentMap<Channel, ClientChannelInfo> chlMap = entry.getValue();

Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Expand Down Expand Up @@ -132,16 +133,13 @@ public void scanNotActiveChannel() {
}
}

public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
if (channel != null) {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
for (final Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
final String group = entry.getKey();
final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
entry.getValue();
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
final ConcurrentMap<Channel, ClientChannelInfo> clientChannelInfoTable = entry.getValue();
final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
removed = true;
Expand All @@ -150,7 +148,7 @@ public synchronized boolean doChannelCloseEvent(final String remoteAddr, final C
clientChannelInfo.toString(), remoteAddr, group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
if (clientChannelInfoTable.isEmpty()) {
ConcurrentHashMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
if (oldGroupTable != null) {
log.info("unregister a producer group[{}] from groupChannelTable", group);
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
Expand All @@ -163,13 +161,16 @@ public synchronized boolean doChannelCloseEvent(final String remoteAddr, final C
return removed;
}

public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound;

ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
this.groupChannelTable.put(group, channelTable);
ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable);
if (null != prev) {
channelTable = prev;
}
}

clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
Expand All @@ -186,8 +187,8 @@ public synchronized void registerProducer(final String group, final ClientChanne
}
}

public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
Expand All @@ -210,7 +211,7 @@ public Channel getAvailableChannel(String groupId) {
return null;
}
List<Channel> channelList;
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
ConcurrentMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) {
channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
} else {
Expand Down

0 comments on commit 9754f5b

Please sign in to comment.