Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
dongeforever committed Aug 1, 2018
2 parents 0f76489 + 795eae6 commit 2b692c9
Show file tree
Hide file tree
Showing 291 changed files with 11,347 additions and 3,607 deletions.
10 changes: 4 additions & 6 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
Please do not create a Pull Request without creating an issue first.

## What is the purpose of the change

XXXXX
Expand All @@ -12,11 +10,11 @@ XX

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily.
Follow this checklist to help us incorporate your contribution quickly and easily:

- [x] Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
- [ ] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
- [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/ROCKETMQ/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
- [ ] Format the pull request title like `[ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
- [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
- [ ] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
- [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
- [ ] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
- [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
6 changes: 5 additions & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0</version>
<version>4.4.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -68,6 +68,10 @@
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
240 changes: 219 additions & 21 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Large diffs are not rendered by default.

32 changes: 20 additions & 12 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
Expand All @@ -31,6 +26,8 @@
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand All @@ -41,16 +38,21 @@
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;

public class BrokerStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
public static Logger log;
public static InternalLogger log;

public static void main(String[] args) {
start(createBrokerController(args));
Expand All @@ -69,6 +71,7 @@ public static BrokerController start(BrokerController controller) {
}

log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
Expand All @@ -78,6 +81,12 @@ public static BrokerController start(BrokerController controller) {
return null;
}

public static void shutdown(final BrokerController controller) {
if (null != controller) {
controller.shutdown();
}
}

public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

Expand Down Expand Up @@ -134,8 +143,7 @@ public static BrokerController createBrokerController(String[] args) {
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation");
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}

Expand Down Expand Up @@ -178,22 +186,22 @@ public static BrokerController createBrokerController(String[] args) {
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

if (commandLine.hasOption('p')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}

log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;

private ScheduledExecutorService scheduledExecutorService = Executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerGroupInfo {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();

private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}

Expand Down Expand Up @@ -185,4 +191,36 @@ public void unregisterProducer(final String group, final ClientChannelInfo clien
log.error("", e);
}
}

public Channel getAvaliableChannel(String groupId) {
HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
List<Channel> channelList = new ArrayList<Channel>();
if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
int size = channelList.size();
if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId);
return null;
}

int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
if (isOk) {
return channel;
}
index = (++index) % size;
channel = channelList.get(index);
isOk = channel.isActive() && channel.isWritable();
}
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@
package org.apache.rocketmq.broker.client.net;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -45,51 +37,45 @@
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

public class Broker2Client {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;

public Broker2Client(BrokerController brokerController) {
this.brokerController = brokerController;
}

public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
final MessageExt messageExt) throws Exception {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();

request.setBody(MessageDecoder.encode(messageExt, false));
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage());
}
}

public RemotingCommand callClient(final Channel channel,
final RemotingCommand request
final RemotingCommand request
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
Expand Down Expand Up @@ -119,7 +105,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
}

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebalanceLockManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock();
Expand Down
Loading

0 comments on commit 2b692c9

Please sign in to comment.