diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 00dfad40191bd..59590e037990f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -173,7 +173,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -1068,12 +1067,6 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index beee9f1a4f763..46a5efef66853 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -51,7 +51,6 @@ import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -315,12 +314,6 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet)); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet)); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 4dcfc17096448..def58be6df372 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -76,18 +76,6 @@ private String computeWsBasePath() { return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get()); } - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = computeWsBasePath() + "/pingpong"; - Future sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - System.out.println("uri" + webSocketUri); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); - } @Test public void testProducer() throws Exception { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java deleted file mode 100644 index 870630abc8889..0000000000000 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener { - - private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class); - - @Override - public void onWebSocketPing(ByteBuffer payload) { - try { - if (log.isDebugEnabled()) { - log.debug("PING: {}", BufferUtil.toDetailString(payload)); - } - getRemote().sendPong(payload); - } catch (IOException e) { - log.warn("Failed to send pong: {}", e.getMessage()); - } - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - - } - -} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java deleted file mode 100644 index cc2d79ee541ba..0000000000000 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -public class WebSocketPingPongServlet extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - public static final String SERVLET_PATH = "/ws/pingpong"; - public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong"; - - private final transient WebSocketService service; - - public WebSocketPingPongServlet(WebSocketService service) { - this.service = service; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); - if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { - factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); - } - factory.setCreator((request, response) -> new PingPongHandler()); - } -} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index fbcecc0642e34..6231ef1a2aa41 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -29,7 +29,6 @@ import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -91,7 +90,6 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service)); proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service)); @@ -99,8 +97,6 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new WebSocketPingPongServlet(service)); proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class); proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class); diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java deleted file mode 100644 index 662009f1aab1a..0000000000000 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Future; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.web.WebExecutorThreadPool; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.client.WebSocketClient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class PingPongHandlerTest { - - private static Server server; - - private static final WebExecutorThreadPool executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test"); - - @BeforeClass - public static void setup() throws Exception { - server = new Server(executor); - List connectors = new ArrayList<>(); - ServerConnector connector = new ServerConnector(server); - connector.setPort(8080); - connectors.add(connector); - connectors.forEach(c -> c.setAcceptQueueSize(1024 / connectors.size())); - server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); - - WebSocketService service = mock(WebSocketService.class); - ServiceConfiguration config = mock(ServiceConfiguration.class); - - when(service.getConfig()).thenReturn(config); - when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576); - when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000); - - ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service)); - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH); - context.addServlet(servletHolder, "/*"); - server.setHandler(context); - try { - server.start(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @AfterClass(alwaysRun = true) - public static void tearDown() throws Exception { - if (server != null) { - server.stop(); - } - executor.stop(); - } - - @Test - public void testPingPong() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; - Future sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes())); - assertTrue(myWebSocket.getResponse().contains("test")); - } - - @WebSocket - public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener { - - ArrayBlockingQueue incomingMessages = new ArrayBlockingQueue<>(10); - - @Override - public void onWebSocketClose(int i, String s) { - } - - @Override - public void onWebSocketConnect(Session session) { - } - - @Override - public void onWebSocketError(Throwable throwable) { - } - - @Override - public void onWebSocketPing(ByteBuffer payload) { - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - incomingMessages.add(BufferUtil.toDetailString(payload)); - } - - public String getResponse() throws InterruptedException { - return incomingMessages.take(); - } - } -}