Skip to content

Commit

Permalink
[fix][ws] Replace incorrect ping/pong implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Jun 30, 2023
1 parent d9ff063 commit a6be8c8
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -1068,12 +1067,6 @@ private void addWebSocketServiceHandler(WebService webService,
new ServletHolder(readerWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true, attributeMap);

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -315,12 +314,6 @@ public static void addWebServerHandlers(WebServer server,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet));
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWebSocketHandler extends WebSocketAdapter implements Closeable {
public abstract class AbstractWebSocketHandler extends PingPongWebSocketAdapter implements Closeable {

protected final WebSocketService service;
protected final HttpServletRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener {
/**
* A WebSocketAdapter that responds to ping messages with pong messages.
*/
public class PingPongWebSocketAdapter extends WebSocketAdapter implements WebSocketPingPongListener {

private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class);
private static final Logger log = LoggerFactory.getLogger(PingPongWebSocketAdapter.class);

@Override
public void onWebSocketPing(ByteBuffer payload) {
Expand All @@ -44,7 +47,6 @@ public void onWebSocketPing(ByteBuffer payload) {

@Override
public void onWebSocketPong(ByteBuffer payload) {

// no-op
}

}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -91,16 +90,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service));

proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new WebSocketPingPongServlet(service));

proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.ServiceConfiguration;
import lombok.Cleanup;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
Expand All @@ -37,14 +37,14 @@
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class PingPongHandlerTest {
public class PingPongWebSocketAdapterTest {

private static Server server;

Expand All @@ -60,16 +60,9 @@ public static void setup() throws Exception {
connectors.forEach(c -> c.setAcceptQueueSize(1024 / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));

WebSocketService service = mock(WebSocketService.class);
ServiceConfiguration config = mock(ServiceConfiguration.class);

when(service.getConfig()).thenReturn(config);
when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);

ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service));
ServletHolder servletHolder = new ServletHolder("ws-events", new MyServlet());
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
context.setContextPath("/ws");
context.addServlet(servletHolder, "/*");
server.setHandler(context);
try {
Expand All @@ -90,10 +83,11 @@ public static void tearDown() throws Exception {
@Test
public void testPingPong() throws Exception {
HttpClient httpClient = new HttpClient();
@Cleanup(value = "stop")
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = "ws://localhost:8080/ws/pingpong";
String webSocketUri = "ws://localhost:8080/ws";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
assertTrue(myWebSocket.getResponse().contains("test"));
Expand Down Expand Up @@ -129,4 +123,11 @@ public String getResponse() throws InterruptedException {
return incomingMessages.take();
}
}

public static class MyServlet extends WebSocketServlet {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(PingPongWebSocketAdapter.class);
}
}
}

0 comments on commit a6be8c8

Please sign in to comment.