Skip to content

Commit

Permalink
[ISSUE#525]add aclClient PRCHook for message track (apache#638)
Browse files Browse the repository at this point in the history
* prepare to separate production-ready projects from the external projects

* Update fastjson to the latest stable version

* Clean code, don't list the default config in JVM

* Update README.md

* update the year info in NOTICE

* Release semaphore when timeout

* [ISSUE#525]add aclClient PRCHook for message track

* [ISSUE#525]add the apache license text,remove the merged from master branch

* [ISSUE#525]add aclClient PRCHook for message track,remove the merged content of notice and readme.md from master branch,add some unit test to increase the code coverage.
  • Loading branch information
zongtanghu authored and dongeforever committed Dec 28, 2018
1 parent bc72e36 commit b8a511c
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 8 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ It offers a variety of features:
----------

## Apache RocketMQ Community
* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)

[RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
----------

## Contributing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;

import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,16 @@ public void checkAdminCodeTest() {
}
}
}

@Test
public void AclExceptionTest(){
AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
Assert.assertEquals(aclException.getCode(),10015);
Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
aclException.setCode(10016);
Assert.assertEquals(aclException.getCode(),10016);
aclException.setStatus("netaddress examine scope Exception netaddress");
Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common;

import org.junit.Assert;
Expand Down Expand Up @@ -25,5 +41,51 @@ public void updateContentTest(){
sessionCredentials.updateContent(properties);
}

@Test
public void SessionCredentialHashCodeTest(){
SessionCredentials sessionCredentials=new SessionCredentials();
Properties properties=new Properties();
properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredentials.updateContent(properties);
Assert.assertEquals(sessionCredentials.hashCode(),353652211);
}

@Test
public void SessionCredentialEqualsTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);

SessionCredentials sessionCredential2 =new SessionCredentials();
Properties properties2=new Properties();
properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential2.updateContent(properties2);

Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
sessionCredential2.setSecretKey("1234567899");
sessionCredential2.setSignature("1234567899");
Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
}

@Test
public void SessionCredentialToStringTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);

Assert.assertEquals(sessionCredential1.toString(),
"SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.pagecache;

import java.nio.ByteBuffer;
import org.apache.rocketmq.store.GetMessageResult;
import org.junit.Assert;
import org.junit.Test;

public class ManyMessageTransferTest {

@Test
public void ManyMessageTransferBuilderTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
}

@Test
public void ManyMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
Assert.assertEquals(manyMessageTransfer.position(),4);
}

@Test
public void ManyMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);

Assert.assertEquals(manyMessageTransfer.count(),20);

}

@Test
public void ManyMessageTransferCloseTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
manyMessageTransfer.close();
manyMessageTransfer.deallocate();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.pagecache;

import java.nio.ByteBuffer;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.junit.Assert;
import org.junit.Test;

public class OneMessageTransferTest {

@Test
public void OneMessageTransferTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
}

@Test
public void OneMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.count(),40);
}

@Test
public void OneMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.position(),8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook;

/**
* Created by zongtanghu on 2018/11/6.
Expand All @@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;

public AsyncArrayDispatcher(Properties properties) throws MQClientException {
public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value
Expand All @@ -92,7 +93,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
}

public String getTraceTopicName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.RPCHook;

public class TrackTraceProducerFactory {

Expand All @@ -33,10 +34,10 @@ public class TrackTraceProducerFactory {
private static DefaultMQProducer traceProducer;


public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) {

traceProducer = new DefaultMQProducer();
traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
<exclude>src/test/resources/certs/*</exclude>
<exclude>src/test/**/*.log</exclude>
<exclude>src/test/resources/META-INF/service/*</exclude>
<exclude>src/main/resources/META-INF/service/*</exclude>
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
</excludes>
Expand Down

0 comments on commit b8a511c

Please sign in to comment.