Skip to content

Commit

Permalink
Merge branch 'feature_acl' into develop-acl
Browse files Browse the repository at this point in the history
  • Loading branch information
dongeforever authored Oct 27, 2018
2 parents 56f81dd + 4915871 commit aea7461
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null;
Expand Down Expand Up @@ -246,4 +247,9 @@ public LinkedList<CommitLogDispatcher> getDispatcherList() {
public ConsumeQueue getConsumeQueue(String topic, int queueId) {
return next.getConsumeQueue(topic, queueId);
}

@Override
public BrokerStatsManager getBrokerStatsManager() {
return next.getBrokerStatsManager();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -760,12 +761,19 @@ private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, Remoting
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Delay offset not supported in this messagetore");
return response;
}

String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.error("get all delay offset from master error.", e);
log.error("Get all delay offset from master error.", e);

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
Expand Down Expand Up @@ -1051,7 +1059,7 @@ private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
MessageStore messageStore = this.brokerController.getMessageStore();

StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testGetLocalInetAddress() throws Exception {
List<String> localInetAddress = MixAll.getLocalInetAddress();
String local = InetAddress.getLocalHost().getHostAddress();
assertThat(localInetAddress).contains("127.0.0.1");

assertThat(local).isNotNull();
}

@Test
Expand Down
20 changes: 3 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@
<skipAfterFailureCount>1</skipAfterFailureCount>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<excludes>
<exclude>**/IT*.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
Expand All @@ -335,23 +338,6 @@
<artifactId>sonar-maven-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<skipAfterFailureCount>1</skipAfterFailureCount>
<excludes>
<exclude>**/NormalMsgDelayIT.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
*/
protected volatile SslContext sslContext;

/**
* custom rpc hooks
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();



static {
NettyLogger.initNettyLogger();
}
Expand Down Expand Up @@ -158,6 +166,23 @@ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand ms
}
}

protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}

protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}


/**
* Process incoming request command issued by remote peer.
*
Expand All @@ -174,15 +199,9 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

if (!cmd.isOnewayRPC()) {
if (response != null) {
Expand Down Expand Up @@ -314,12 +333,29 @@ public void run() {
}
}



/**
* Custom RPC hook.
* Just be compatible with the previous version, use getRPCHooks instead.
*/
@Deprecated
protected RPCHook getRPCHook() {
if (rpcHooks.size() > 0) {
return rpcHooks.get(0);
}
return null;
}

/**
* Custom RPC hooks.
*
* @return RPC hook if specified; null otherwise.
* @return RPC hooks if specified; null otherwise.
*/
public abstract RPCHook getRPCHook();
public List<RPCHook> getRPCHooks() {
return rpcHooks;
}


/**
* This method specifies thread pool to use while invoking callback methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
Expand All @@ -53,6 +54,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
Expand All @@ -64,8 +67,6 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
Expand Down Expand Up @@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;

public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
Expand Down Expand Up @@ -283,7 +283,9 @@ public void closeChannel(final String addr, final Channel channel) {

@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}

public void closeChannel(final Channel channel) {
Expand Down Expand Up @@ -357,24 +359,22 @@ public void updateNameServerAddressList(List<String> addrs) {
}
}



@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
Expand Down Expand Up @@ -522,9 +522,7 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
Expand All @@ -547,9 +545,7 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
Expand Down Expand Up @@ -592,10 +588,6 @@ public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}

@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}

@Override
public ExecutorService getCallbackExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;

private RPCHook rpcHook;

private int port = 0;

Expand Down Expand Up @@ -266,7 +267,9 @@ public void shutdown() {

@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}

@Override
Expand Down Expand Up @@ -318,10 +321,6 @@ public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}

@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}

@Override
public ExecutorService getCallbackExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
cq.putMessagePositionInfoWrapper(dispatchRequest);
}

@Override
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
Expand Down Expand Up @@ -358,4 +359,11 @@ QueryMessageResult queryMessage(final String topic, final String key, final int
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);

/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
}

0 comments on commit aea7461

Please sign in to comment.