Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][ws] Replace incorrect ping/pong implementation #52

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -1068,12 +1067,6 @@ private void addWebSocketServiceHandler(WebService webService,
new ServletHolder(readerWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true, attributeMap);

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -315,12 +314,6 @@ public static void addWebServerHandlers(WebServer server,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet));
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ private String computeWsBasePath() {
return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get());
}

@Test
public void testEnableWebSocketServer() throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = computeWsBasePath() + "/pingpong";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
System.out.println("uri" + webSocketUri);
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
assertTrue(myWebSocket.getResponse().contains("ping"));
}

@Test
public void testProducer() throws Exception {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -91,16 +90,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service));

proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new WebSocketPingPongServlet(service));

proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
Expand All @@ -40,11 +43,18 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class PingPongHandlerTest {
/**
* Test to ensure {@link AbstractWebSocketHandler} has ping/pong support
*/
public class PingPongSupportTest {

private static Server server;

Expand All @@ -67,9 +77,9 @@ public static void setup() throws Exception {
when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);

ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service));
ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
context.setContextPath("/ws");
context.addServlet(servletHolder, "/*");
server.setHandler(context);
try {
Expand All @@ -87,18 +97,60 @@ public static void tearDown() throws Exception {
executor.stop();
}

@Test
public void testPingPong() throws Exception {
/**
* We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are
* not testing these implementations, but the ping/pong support is guaranteed as part of the framework.
*/
@DataProvider(name = "endpoint")
public static Object[][] cacheEnable() {
return new Object[][] { { "producer" }, { "consumer" }, { "reader" } };
}

@Test(dataProvider = "endpoint")
public void testPingPong(String endpoint) throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = "ws://localhost:8080/ws/pingpong";
String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
assertTrue(myWebSocket.getResponse().contains("test"));
}

public static class GenericWebSocketHandler extends AbstractWebSocketHandler {

public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
}

@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
return true;
}

@Override
public void close() throws IOException {

}
}

public static class GenericWebSocketServlet extends WebSocketServlet {

private static final long serialVersionUID = 1L;
private final WebSocketService service;

public GenericWebSocketServlet(WebSocketService service) {
this.service = service;
}

@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator((request, response) ->
new GenericWebSocketHandler(service, request.getHttpServletRequest(), response));
}
}

@WebSocket
public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {

Expand Down