From 3650c0654e00d00f28626798afd1dd0f9a2e9359 Mon Sep 17 00:00:00 2001 From: zhaixiaoxiang Date: Wed, 20 Mar 2019 15:52:29 +0800 Subject: [PATCH] Add unit test for unpack and stick pack of dubbo and telent --- .../transport/netty4/NettyCodecAdapter.java | 2 +- .../dubbo/decode/DubboTelnetDecodeTest.java | 477 ++++++++++++++++++ .../dubbo/decode/LocalEmbeddedChannel.java | 35 ++ .../protocol/dubbo/decode/MockChannel.java | 115 +++++ .../dubbo/decode/MockChannelHandler.java | 61 +++ .../protocol/dubbo/decode/MockHandler.java | 40 ++ 6 files changed, 729 insertions(+), 1 deletion(-) create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java index 5f3b7848079..e0e4513c5c6 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java @@ -33,7 +33,7 @@ /** * NettyCodecAdapter. */ -final class NettyCodecAdapter { +final public class NettyCodecAdapter { private final ChannelHandler encoder = new InternalEncoder(); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java new file mode 100644 index 00000000000..e54d2e6508a --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.dubbo.decode; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.remoting.Codec2; +import org.apache.dubbo.remoting.buffer.ChannelBuffer; +import org.apache.dubbo.remoting.exchange.ExchangeChannel; +import org.apache.dubbo.remoting.exchange.Request; +import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; +import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler; +import org.apache.dubbo.remoting.transport.DecodeHandler; +import org.apache.dubbo.remoting.transport.MultiMessageHandler; +import org.apache.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer; +import org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation; +import org.apache.dubbo.rpc.protocol.dubbo.DubboCodec; +import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * These junit tests aim to test unpack and stick pack of dubbo and telnet + */ +public class DubboTelnetDecodeTest { + private static AtomicInteger dubbo = new AtomicInteger(0); + + private static AtomicInteger telnet = new AtomicInteger(0); + + private static AtomicInteger telnetDubbo = new AtomicInteger(0); + + private static AtomicInteger dubboDubbo = new AtomicInteger(0); + + private static AtomicInteger dubboTelnet = new AtomicInteger(0); + + private static AtomicInteger telnetTelnet = new AtomicInteger(0); + + /** + * just dubbo request + * + * @throws InterruptedException + */ + @Test + public void testDubboDecode() throws InterruptedException, IOException { + ByteBuf dubboByteBuf = createDubboByteBuf(); + + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler(null, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + if (checkDubboDecoded(msg)) { + dubbo.incrementAndGet(); + } + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(dubboByteBuf); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(1, dubbo.get()); + } + + /** + * just telnet request + * + * @throws InterruptedException + */ + @Test + public void testTelnetDecode() throws InterruptedException { + ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r\n".getBytes()); + + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler((msg) -> { + if (checkTelnetDecoded(msg)) { + telnet.incrementAndGet(); + } + }, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(telnetByteBuf); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(1, telnet.get()); + } + + /** + * telnet and dubbo request + * + *

+ * First ByteBuf: + * +--------------------------------------------------+ + * | telnet(incomplete) | + * +--------------------------------------------------+ + *

+ * + * Second ByteBuf: + * +--------------------------++----------------------+ + * | telnet(the remaining) || dubbo(complete) | + * +--------------------------++----------------------+ + * || + * Magic Code + * + * @throws InterruptedException + */ + @Test + public void testTelnetDubboDecoded() throws InterruptedException, IOException { + ByteBuf dubboByteBuf = createDubboByteBuf(); + + ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes()); + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler((msg) -> { + if (checkTelnetDecoded(msg)) { + telnetDubbo.incrementAndGet(); + } + }, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + if (checkDubboDecoded(msg)) { + telnetDubbo.incrementAndGet(); + } + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(telnetByteBuf); + ch.writeInbound(Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("\n".getBytes()), dubboByteBuf)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(2, telnetDubbo.get()); + } + + /** + * NOTE: This test case actually will fail, but the probability of this case is very small, + * and users should use telnet in new QOS port(default port is 22222) since dubbo 2.5.8, + * so we could ignore this problem. + * + *

+ * telnet and telnet request + * + *

+ * First ByteBuf (firstByteBuf): + * +--------------------------------------------------+ + * | telnet(incomplete) | + * +--------------------------------------------------+ + *

+ * + * Second ByteBuf (secondByteBuf): + * +--------------------------------------------------+ + * | telnet(the remaining) | telnet(complete) | + * +--------------------------------------------------+ + * + * @throws InterruptedException + */ + // @Test + public void testTelnetTelnetDecoded() throws InterruptedException { + ByteBuf firstByteBuf = Unpooled.wrappedBuffer("ls\r".getBytes()); + ByteBuf secondByteBuf = Unpooled.wrappedBuffer("\nls\r\n".getBytes()); + + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler((msg) -> { + if (checkTelnetDecoded(msg)) { + telnetTelnet.incrementAndGet(); + } + }, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(firstByteBuf); + ch.writeInbound(secondByteBuf); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(2, telnetTelnet.get()); + } + + /** + * dubbo and dubbo request + * + *

+ * First ByteBuf (firstDubboByteBuf): + * ++-------------------------------------------------+ + * || dubbo(incomplete) | + * ++-------------------------------------------------+ + * || + * Magic Code + *

+ * + *

+ * Second ByteBuf (secondDubboByteBuf): + * +-------------------------++-----------------------+ + * | dubbo(the remaining) || dubbo(complete) | + * +-------------------------++-----------------------+ + * || + * Magic Code + * + * @throws InterruptedException + */ + @Test + public void testDubboDubboDecoded() throws InterruptedException, IOException { + ByteBuf dubboByteBuf = createDubboByteBuf(); + + ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50); + ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes() - 50); + ByteBuf secondDubboByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, dubboByteBuf); + + + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler(null, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + if (checkDubboDecoded(msg)) { + dubboDubbo.incrementAndGet(); + } + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(firstDubboByteBuf); + ch.writeInbound(secondDubboByteBuf); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(2, dubboDubbo.get()); + } + + /** + * dubbo and telnet request + * + *

+ * First ByteBuf: + * ++-------------------------------------------------+ + * || dubbo(incomplete) | + * ++-------------------------------------------------+ + * || + * Magic Code + * + *

+ * Second ByteBuf: + * +--------------------------------------------------+ + * | dubbo(the remaining) | telnet(complete) | + * +--------------------------------------------------+ + * + * @throws InterruptedException + */ + @Test + public void testDubboTelnetDecoded() throws InterruptedException, IOException { + ByteBuf dubboByteBuf = createDubboByteBuf(); + ByteBuf firstDubboByteBuf = dubboByteBuf.copy(0, 50); + ByteBuf secondLeftDubboByteBuf = dubboByteBuf.copy(50, dubboByteBuf.readableBytes()); + + ByteBuf telnetByteBuf = Unpooled.wrappedBuffer("\r\n".getBytes()); + ByteBuf secondByteBuf = Unpooled.wrappedBuffer(secondLeftDubboByteBuf, telnetByteBuf); + + EmbeddedChannel ch = null; + try { + Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo"); + URL url = new URL("dubbo", "localhost", 22226); + NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, new MockChannelHandler()); + + MockHandler mockHandler = new MockHandler((msg) -> { + if (checkTelnetDecoded(msg)) { + dubboTelnet.incrementAndGet(); + } + }, + new MultiMessageHandler( + new DecodeHandler( + new HeaderExchangeHandler(new ExchangeHandlerAdapter() { + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) { + if (checkDubboDecoded(msg)) { + dubboTelnet.incrementAndGet(); + } + return null; + } + })))); + + ch = new LocalEmbeddedChannel(); + ch.pipeline() + .addLast("decoder", adapter.getDecoder()) + .addLast("handler", mockHandler); + + ch.writeInbound(firstDubboByteBuf); + ch.writeInbound(secondByteBuf); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (ch != null) { + ch.close().await(200, TimeUnit.MILLISECONDS); + } + } + + TimeUnit.MILLISECONDS.sleep(100); + + Assertions.assertEquals(2, dubboTelnet.get()); + } + + private ByteBuf createDubboByteBuf() throws IOException { + Request request = new Request(); + RpcInvocation rpcInvocation = new RpcInvocation(); + rpcInvocation.setMethodName("sayHello"); + rpcInvocation.setParameterTypes(new Class[]{String.class}); + rpcInvocation.setArguments(new String[]{"dubbo"}); + rpcInvocation.setAttachment("path", DemoService.class.getName()); + rpcInvocation.setAttachment("interface", DemoService.class.getName()); + rpcInvocation.setAttachment("version", "0.0.0"); + + request.setData(rpcInvocation); + request.setVersion("2.0.2"); + + ByteBuf dubboByteBuf = Unpooled.buffer(); + ChannelBuffer buffer = new NettyBackedChannelBuffer(dubboByteBuf); + DubboCodec dubboCodec = new DubboCodec(); + dubboCodec.encode(new MockChannel(), buffer, request); + + return dubboByteBuf; + } + + private static boolean checkTelnetDecoded(Object msg) { + if (msg != null && msg instanceof String && !msg.toString().contains("Unsupported command:")) { + return true; + } + return false; + } + + private static boolean checkDubboDecoded(Object msg) { + if (msg instanceof DecodeableRpcInvocation) { + DecodeableRpcInvocation invocation = (DecodeableRpcInvocation) msg; + if ("sayHello".equals(invocation.getMethodName()) + && invocation.getParameterTypes().length == 1 + && String.class.equals(invocation.getParameterTypes()[0]) + && invocation.getArguments().length == 1 + && "dubbo".equals(invocation.getArguments()[0]) + && DemoService.class.getName().equals(invocation.getAttachment("path")) + && DemoService.class.getName().equals(invocation.getAttachment("interface")) + && "0.0.0".equals(invocation.getAttachment("version"))) { + return true; + } + } + return false; + } +} \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java new file mode 100644 index 00000000000..adc590d7516 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/LocalEmbeddedChannel.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.dubbo.decode; + +import org.apache.dubbo.common.utils.NetUtils; + +import io.netty.channel.embedded.EmbeddedChannel; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class LocalEmbeddedChannel extends EmbeddedChannel { + public SocketAddress localAddress() { + return new InetSocketAddress(20883); + } + + @Override + protected SocketAddress remoteAddress0() { + return new InetSocketAddress(NetUtils.getAvailablePort()); + } +} diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java new file mode 100644 index 00000000000..80a2dc5b016 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannel.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.dubbo.decode; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.ChannelHandler; +import org.apache.dubbo.remoting.RemotingException; + +import java.net.InetSocketAddress; +import java.util.function.Consumer; + +public class MockChannel implements Channel { + private Consumer consumer; + + public MockChannel() { + + } + + public MockChannel(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public InetSocketAddress getRemoteAddress() { + return new InetSocketAddress(NetUtils.getAvailablePort()); + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public boolean hasAttribute(String key) { + return false; + } + + @Override + public Object getAttribute(String key) { + return null; + } + + @Override + public void setAttribute(String key, Object value) { + + } + + @Override + public void removeAttribute(String key) { + + } + + @Override + public URL getUrl() { + return new URL("dubbo", "localhost", 20880); + } + + @Override + public ChannelHandler getChannelHandler() { + return null; + } + + @Override + public InetSocketAddress getLocalAddress() { + return new InetSocketAddress(20883); + } + + @Override + public void send(Object message) throws RemotingException { + if (consumer != null) { + consumer.accept(message); + } + } + + @Override + public void send(Object message, boolean sent) throws RemotingException { + + } + + @Override + public void close() { + + } + + @Override + public void close(int timeout) { + + } + + @Override + public void startClose() { + + } + + @Override + public boolean isClosed() { + return false; + } +} diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java new file mode 100644 index 00000000000..492929706dd --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockChannelHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.dubbo.decode; + +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.ChannelHandler; +import org.apache.dubbo.remoting.RemotingException; + +import java.util.Collections; +import java.util.Set; + +public class MockChannelHandler implements ChannelHandler { + // ConcurrentMap channels = new ConcurrentHashMap(); + ConcurrentHashSet channels = new ConcurrentHashSet(); + + @Override + public void connected(Channel channel) throws RemotingException { + channels.add(channel); + } + + @Override + public void disconnected(Channel channel) throws RemotingException { + channels.remove(channel); + } + + @Override + public void sent(Channel channel, Object message) throws RemotingException { + channel.send(message); + } + + @Override + public void received(Channel channel, Object message) throws RemotingException { + //echo + channel.send(message); + } + + @Override + public void caught(Channel channel, Throwable exception) throws RemotingException { + throw new RemotingException(channel, exception); + + } + + public Set getChannels() { + return Collections.unmodifiableSet(channels); + } +} diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java new file mode 100644 index 00000000000..f4f432a733e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/MockHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.dubbo.decode; + +import org.apache.dubbo.remoting.ChannelHandler; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; + +import java.util.function.Consumer; + +public class MockHandler extends ChannelDuplexHandler { + private final Consumer consumer; + + private final ChannelHandler handler; + + public MockHandler(Consumer consumer, ChannelHandler handler) { + this.consumer = consumer; + this.handler = handler; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + this.handler.received(new MockChannel(consumer), msg); + } +}